You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by lj...@apache.org on 2018/09/20 05:01:15 UTC

[1/3] incubator-ratis git commit: RATIS-324. Rename grpc classes. Contributed by Tsz Wo Nicholas Sze.

Repository: incubator-ratis
Updated Branches:
  refs/heads/master 974919e5e -> ed8e60dad


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java
deleted file mode 100644
index d047803..0000000
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolService.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc.server;
-
-import org.apache.ratis.grpc.RaftGrpcUtil;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
-import org.apache.ratis.shaded.proto.RaftProtos.*;
-import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase;
-import org.apache.ratis.util.ProtoUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Supplier;
-
-public class RaftServerProtocolService extends RaftServerProtocolServiceImplBase {
-  public static final Logger LOG = LoggerFactory.getLogger(RaftServerProtocolService.class);
-
-  private final Supplier<RaftPeerId> idSupplier;
-  private final RaftServer server;
-
-  public RaftServerProtocolService(Supplier<RaftPeerId> idSupplier, RaftServer server) {
-    this.idSupplier = idSupplier;
-    this.server = server;
-  }
-
-  RaftPeerId getId() {
-    return idSupplier.get();
-  }
-
-  @Override
-  public void requestVote(RequestVoteRequestProto request,
-      StreamObserver<RequestVoteReplyProto> responseObserver) {
-    try {
-      final RequestVoteReplyProto reply = server.requestVote(request);
-      responseObserver.onNext(reply);
-      responseObserver.onCompleted();
-    } catch (Throwable e) {
-      RaftGrpcUtil.warn(LOG, () -> getId() + ": Failed requestVote " + ProtoUtils.toString(request.getServerRequest()), e);
-      responseObserver.onError(RaftGrpcUtil.wrapException(e));
-    }
-  }
-
-  @Override
-  public StreamObserver<AppendEntriesRequestProto> appendEntries(
-      StreamObserver<AppendEntriesReplyProto> responseObserver) {
-    return new StreamObserver<AppendEntriesRequestProto>() {
-      private final AtomicReference<CompletableFuture<Void>> previousOnNext =
-          new AtomicReference<>(CompletableFuture.completedFuture(null));
-      private final AtomicBoolean isClosed = new AtomicBoolean(false);
-
-      @Override
-      public void onNext(AppendEntriesRequestProto request) {
-        final CompletableFuture<Void> current = new CompletableFuture<>();
-        final CompletableFuture<Void> previous = previousOnNext.getAndSet(current);
-        try {
-          server.appendEntriesAsync(request).thenCombine(previous,
-              (reply, v) -> {
-            if (!isClosed.get()) {
-              responseObserver.onNext(reply);
-            }
-            current.complete(null);
-            return null;
-          });
-        } catch (Throwable e) {
-          RaftGrpcUtil.warn(LOG, () -> getId() + ": Failed appendEntries " + ProtoUtils.toString(request.getServerRequest()), e);
-          responseObserver.onError(RaftGrpcUtil.wrapException(e, request.getServerRequest().getCallId()));
-          current.completeExceptionally(e);
-        }
-      }
-
-      @Override
-      public void onError(Throwable t) {
-        // for now we just log a msg
-        RaftGrpcUtil.warn(LOG, () -> getId() + ": appendEntries onError", t);
-      }
-
-      @Override
-      public void onCompleted() {
-        if (isClosed.compareAndSet(false, true)) {
-          LOG.info("{}: appendEntries completed", getId());
-          responseObserver.onCompleted();
-        }
-      }
-    };
-  }
-
-  @Override
-  public StreamObserver<InstallSnapshotRequestProto> installSnapshot(
-      StreamObserver<InstallSnapshotReplyProto> responseObserver) {
-    return new StreamObserver<InstallSnapshotRequestProto>() {
-      @Override
-      public void onNext(InstallSnapshotRequestProto request) {
-        try {
-          final InstallSnapshotReplyProto reply = server.installSnapshot(request);
-          responseObserver.onNext(reply);
-        } catch (Throwable e) {
-          RaftGrpcUtil.warn(LOG, () -> getId() + ": Failed installSnapshot " + ProtoUtils.toString(request.getServerRequest()), e);
-          responseObserver.onError(RaftGrpcUtil.wrapException(e));
-        }
-      }
-
-      @Override
-      public void onError(Throwable t) {
-        RaftGrpcUtil.warn(LOG, () -> getId() + ": installSnapshot onError", t);
-      }
-
-      @Override
-      public void onCompleted() {
-        LOG.info("{}: installSnapshot completed", getId());
-        responseObserver.onCompleted();
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
deleted file mode 100644
index a2c419f..0000000
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc;
-
-import org.apache.ratis.MiniRaftCluster;
-import org.apache.ratis.RaftConfigKeys;
-import org.apache.ratis.RaftTestUtil;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.impl.*;
-import org.apache.ratis.statemachine.StateMachine;
-
-import java.io.IOException;
-
-public class MiniRaftClusterWithGRpc extends MiniRaftCluster.RpcBase {
-  public static final Factory<MiniRaftClusterWithGRpc> FACTORY
-      = new Factory<MiniRaftClusterWithGRpc>() {
-    @Override
-    public MiniRaftClusterWithGRpc newCluster(
-        String[] ids, RaftProperties prop) {
-      RaftConfigKeys.Rpc.setType(prop, SupportedRpcType.GRPC);
-      return new MiniRaftClusterWithGRpc(ids, prop);
-    }
-  };
-
-  public interface FactoryGet extends Factory.Get<MiniRaftClusterWithGRpc> {
-    @Override
-    default Factory<MiniRaftClusterWithGRpc> getFactory() {
-      return FACTORY;
-    }
-  }
-
-  public static final DelayLocalExecutionInjection sendServerRequestInjection =
-      new DelayLocalExecutionInjection(RaftGRpcService.GRPC_SEND_SERVER_REQUEST);
-
-  private MiniRaftClusterWithGRpc(String[] ids, RaftProperties properties) {
-    super(ids, properties, null);
-  }
-
-  @Override
-  protected RaftServerProxy newRaftServer(
-      RaftPeerId id, StateMachine stateMachine, RaftGroup group,
-      RaftProperties properties) throws IOException {
-    GrpcConfigKeys.Server.setPort(properties, getPort(id, group));
-    return ServerImplUtils.newRaftServer(id, group, stateMachine, properties, null);
-  }
-
-  @Override
-  protected void blockQueueAndSetDelay(String leaderId, int delayMs)
-      throws InterruptedException {
-    RaftTestUtil.blockQueueAndSetDelay(getServers(), sendServerRequestInjection,
-        leaderId, delayMs, getMaxTimeout());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
new file mode 100644
index 0000000..176cfa0
--- /dev/null
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.server.GrpcService;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.server.impl.*;
+import org.apache.ratis.statemachine.StateMachine;
+
+import java.io.IOException;
+
+public class MiniRaftClusterWithGrpc extends MiniRaftCluster.RpcBase {
+  public static final Factory<MiniRaftClusterWithGrpc> FACTORY
+      = new Factory<MiniRaftClusterWithGrpc>() {
+    @Override
+    public MiniRaftClusterWithGrpc newCluster(
+        String[] ids, RaftProperties prop) {
+      RaftConfigKeys.Rpc.setType(prop, SupportedRpcType.GRPC);
+      return new MiniRaftClusterWithGrpc(ids, prop);
+    }
+  };
+
+  public interface FactoryGet extends Factory.Get<MiniRaftClusterWithGrpc> {
+    @Override
+    default Factory<MiniRaftClusterWithGrpc> getFactory() {
+      return FACTORY;
+    }
+  }
+
+  public static final DelayLocalExecutionInjection sendServerRequestInjection =
+      new DelayLocalExecutionInjection(GrpcService.GRPC_SEND_SERVER_REQUEST);
+
+  private MiniRaftClusterWithGrpc(String[] ids, RaftProperties properties) {
+    super(ids, properties, null);
+  }
+
+  @Override
+  protected RaftServerProxy newRaftServer(
+      RaftPeerId id, StateMachine stateMachine, RaftGroup group,
+      RaftProperties properties) throws IOException {
+    GrpcConfigKeys.Server.setPort(properties, getPort(id, group));
+    return ServerImplUtils.newRaftServer(id, group, stateMachine, properties, null);
+  }
+
+  @Override
+  protected void blockQueueAndSetDelay(String leaderId, int delayMs)
+      throws InterruptedException {
+    RaftTestUtil.blockQueueAndSetDelay(getServers(), sendServerRequestInjection,
+        leaderId, delayMs, getMaxTimeout());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestGroupManagementWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestGroupManagementWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestGroupManagementWithGrpc.java
index 0b5e2a9..657bfd1 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestGroupManagementWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestGroupManagementWithGrpc.java
@@ -23,6 +23,6 @@ import org.apache.ratis.server.impl.GroupManagementBaseTest;
 public class TestGroupManagementWithGrpc extends GroupManagementBaseTest {
   @Override
   public MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory() {
-    return MiniRaftClusterWithGRpc.FACTORY;
+    return MiniRaftClusterWithGrpc.FACTORY;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestLeaderElectionWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestLeaderElectionWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestLeaderElectionWithGrpc.java
index a62dab0..eb08336 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestLeaderElectionWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestLeaderElectionWithGrpc.java
@@ -22,15 +22,15 @@ import org.apache.ratis.server.impl.LeaderElectionTests;
 import org.junit.Test;
 
 public class TestLeaderElectionWithGrpc
-    extends LeaderElectionTests<MiniRaftClusterWithGRpc>
-    implements MiniRaftClusterWithGRpc.FactoryGet {
+    extends LeaderElectionTests<MiniRaftClusterWithGrpc>
+    implements MiniRaftClusterWithGrpc.FactoryGet {
 
   @Override
   @Test
   public void testEnforceLeader() throws Exception {
     super.testEnforceLeader();
 
-    MiniRaftClusterWithGRpc.sendServerRequestInjection.clear();
+    MiniRaftClusterWithGrpc.sendServerRequestInjection.clear();
     BlockRequestHandlingInjection.getInstance().unblockAll();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java
index 752a3dd..614787e 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftAsyncWithGrpc.java
@@ -19,6 +19,6 @@ package org.apache.ratis.grpc;
 
 import org.apache.ratis.RaftAsyncTests;
 
-public class TestRaftAsyncWithGrpc extends RaftAsyncTests<MiniRaftClusterWithGRpc>
-    implements MiniRaftClusterWithGRpc.FactoryGet {
+public class TestRaftAsyncWithGrpc extends RaftAsyncTests<MiniRaftClusterWithGrpc>
+    implements MiniRaftClusterWithGrpc.FactoryGet {
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftExceptionWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftExceptionWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftExceptionWithGrpc.java
index fc110ea..d2b71bc 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftExceptionWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftExceptionWithGrpc.java
@@ -20,6 +20,6 @@ package org.apache.ratis.grpc;
 import org.apache.ratis.RaftExceptionBaseTest;
 
 public class TestRaftExceptionWithGrpc
-    extends RaftExceptionBaseTest<MiniRaftClusterWithGRpc>
-    implements MiniRaftClusterWithGRpc.FactoryGet {
+    extends RaftExceptionBaseTest<MiniRaftClusterWithGrpc>
+    implements MiniRaftClusterWithGrpc.FactoryGet {
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java
deleted file mode 100644
index 822b923..0000000
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc;
-
-import org.apache.log4j.Level;
-import org.apache.ratis.grpc.server.RaftServerProtocolService;
-import org.apache.ratis.server.impl.RaftReconfigurationBaseTest;
-import org.apache.ratis.util.LogUtils;
-
-import java.io.IOException;
-
-public class TestRaftReconfigurationWithGRpc extends RaftReconfigurationBaseTest {
-  static {
-    LogUtils.setLogLevel(RaftServerProtocolService.LOG, Level.DEBUG);
-  }
-
-  @Override
-  public MiniRaftClusterWithGRpc getCluster(int peerNum) throws IOException {
-    return MiniRaftClusterWithGRpc.FACTORY.newCluster(peerNum, prop);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGrpc.java
new file mode 100644
index 0000000..29f8bea
--- /dev/null
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGrpc.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.grpc.server.GrpcServerProtocolService;
+import org.apache.ratis.server.impl.RaftReconfigurationBaseTest;
+import org.apache.ratis.util.LogUtils;
+
+import java.io.IOException;
+
+public class TestRaftReconfigurationWithGrpc extends RaftReconfigurationBaseTest {
+  static {
+    LogUtils.setLogLevel(GrpcServerProtocolService.LOG, Level.DEBUG);
+  }
+
+  @Override
+  public MiniRaftClusterWithGrpc getCluster(int peerNum) {
+    return MiniRaftClusterWithGrpc.FACTORY.newCluster(peerNum, prop);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index 7173e1f..8a9e94b 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -33,8 +33,8 @@ public class TestRaftServerWithGrpc extends BaseTest {
   @Test
   public void testServerRestartOnException() throws Exception {
     RaftProperties properties = new RaftProperties();
-    final MiniRaftClusterWithGRpc cluster
-        = MiniRaftClusterWithGRpc.FACTORY.newCluster(1, properties);
+    final MiniRaftClusterWithGrpc cluster
+        = MiniRaftClusterWithGrpc.FACTORY.newCluster(1, properties);
     cluster.start();
     RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
     GrpcConfigKeys.Server.setPort(properties, cluster.getLeader().getServerRpc().getInetSocketAddress().getPort());

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftSnapshotWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftSnapshotWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftSnapshotWithGrpc.java
index 091277d..a960478 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftSnapshotWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftSnapshotWithGrpc.java
@@ -18,14 +18,11 @@
 package org.apache.ratis.grpc;
 
 import org.apache.ratis.MiniRaftCluster;
-import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.statemachine.RaftSnapshotBaseTest;
 
-import java.io.IOException;
-
 public class TestRaftSnapshotWithGrpc extends RaftSnapshotBaseTest {
   @Override
   public MiniRaftCluster.Factory<?> getFactory() {
-    return MiniRaftClusterWithGRpc.FACTORY;
+    return MiniRaftClusterWithGrpc.FACTORY;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStateMachineExceptionWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStateMachineExceptionWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStateMachineExceptionWithGrpc.java
index ca36738..c8789a7 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStateMachineExceptionWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStateMachineExceptionWithGrpc.java
@@ -20,7 +20,7 @@ package org.apache.ratis.grpc;
 import org.apache.ratis.server.impl.RaftStateMachineExceptionTests;
 
 public class TestRaftStateMachineExceptionWithGrpc
-    extends RaftStateMachineExceptionTests<MiniRaftClusterWithGRpc>
-    implements MiniRaftClusterWithGRpc.FactoryGet {
+    extends RaftStateMachineExceptionTests<MiniRaftClusterWithGrpc>
+    implements MiniRaftClusterWithGrpc.FactoryGet {
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
index 17956c7..f3897ac 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
@@ -21,8 +21,8 @@ import org.apache.log4j.Level;
 import org.apache.ratis.BaseTest;
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.grpc.client.AppendStreamer;
-import org.apache.ratis.grpc.client.RaftOutputStream;
+import org.apache.ratis.grpc.client.GrpcClientStreamer;
+import org.apache.ratis.grpc.client.GrpcOutputStream;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.protocol.TermIndex;
@@ -51,14 +51,14 @@ import static org.junit.Assert.fail;
 @Ignore
 public class TestRaftStream extends BaseTest {
   static {
-    LogUtils.setLogLevel(AppendStreamer.LOG, Level.ALL);
+    LogUtils.setLogLevel(GrpcClientStreamer.LOG, Level.ALL);
   }
 
   private static final RaftProperties prop = new RaftProperties();
   private static final int NUM_SERVERS = 3;
   private static final byte[] BYTES = new byte[4];
 
-  private MiniRaftClusterWithGRpc cluster;
+  private MiniRaftClusterWithGrpc cluster;
 
   @After
   public void tearDown() {
@@ -85,12 +85,12 @@ public class TestRaftStream extends BaseTest {
 
     // default 64K is too large for a test
     GrpcConfigKeys.OutputStream.setBufferSize(prop, SizeInBytes.valueOf(4));
-    cluster = MiniRaftClusterWithGRpc.FACTORY.newCluster(NUM_SERVERS, prop);
+    cluster = MiniRaftClusterWithGrpc.FACTORY.newCluster(NUM_SERVERS, prop);
 
     cluster.start();
     RaftServerImpl leader = waitForLeader(cluster);
 
-    try (RaftOutputStream out = new RaftOutputStream(prop, ClientId.randomId(),
+    try (GrpcOutputStream out = new GrpcOutputStream(prop, ClientId.randomId(),
         cluster.getGroup(), leader.getId())) {
       for (int i = 0; i < numRequests; i++) { // generate requests
         out.write(toBytes(i));
@@ -124,11 +124,11 @@ public class TestRaftStream extends BaseTest {
     LOG.info("Running testWriteAndFlush");
 
     GrpcConfigKeys.OutputStream.setBufferSize(prop, SizeInBytes.valueOf(ByteValue.BUFFERSIZE));
-    cluster = MiniRaftClusterWithGRpc.FACTORY.newCluster(NUM_SERVERS, prop);
+    cluster = MiniRaftClusterWithGrpc.FACTORY.newCluster(NUM_SERVERS, prop);
     cluster.start();
 
     RaftServerImpl leader = waitForLeader(cluster);
-    RaftOutputStream out = new RaftOutputStream(prop, ClientId.randomId(),
+    GrpcOutputStream out = new GrpcOutputStream(prop, ClientId.randomId(),
         cluster.getGroup(), leader.getId());
 
     int[] lengths = new int[]{1, 500, 1023, 1024, 1025, 2048, 3000, 3072};
@@ -203,11 +203,11 @@ public class TestRaftStream extends BaseTest {
     LOG.info("Running testWriteWithOffset");
     GrpcConfigKeys.OutputStream.setBufferSize(prop, SizeInBytes.valueOf(ByteValue.BUFFERSIZE));
 
-    cluster = MiniRaftClusterWithGRpc.FACTORY.newCluster(NUM_SERVERS, prop);
+    cluster = MiniRaftClusterWithGrpc.FACTORY.newCluster(NUM_SERVERS, prop);
     cluster.start();
     RaftServerImpl leader = waitForLeader(cluster);
 
-    RaftOutputStream out = new RaftOutputStream(prop, ClientId.randomId(),
+    GrpcOutputStream out = new GrpcOutputStream(prop, ClientId.randomId(),
         cluster.getGroup(), leader.getId());
 
     byte[] b1 = new byte[ByteValue.BUFFERSIZE / 2];
@@ -261,7 +261,7 @@ public class TestRaftStream extends BaseTest {
     LOG.info("Running testChangeLeader");
 
     GrpcConfigKeys.OutputStream.setBufferSize(prop, SizeInBytes.valueOf(4));
-    cluster = MiniRaftClusterWithGRpc.FACTORY.newCluster(NUM_SERVERS, prop);
+    cluster = MiniRaftClusterWithGrpc.FACTORY.newCluster(NUM_SERVERS, prop);
     cluster.start();
     final RaftServerImpl leader = waitForLeader(cluster);
 
@@ -273,7 +273,7 @@ public class TestRaftStream extends BaseTest {
     new Thread(() -> {
       LOG.info("Writer thread starts");
       int count = 0;
-      try (RaftOutputStream out = new RaftOutputStream(prop, ClientId.randomId(),
+      try (GrpcOutputStream out = new GrpcOutputStream(prop, ClientId.randomId(),
           cluster.getGroup(), leader.getId())) {
         while (running.get()) {
           out.write(toBytes(count++));

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index 16c0f31..2d0af07 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -36,8 +36,8 @@ import java.util.concurrent.CompletableFuture;
 import static org.apache.ratis.RaftTestUtil.waitForLeader;
 
 public class TestRaftWithGrpc
-    extends RaftBasicTests<MiniRaftClusterWithGRpc>
-    implements MiniRaftClusterWithGRpc.FactoryGet {
+    extends RaftBasicTests<MiniRaftClusterWithGrpc>
+    implements MiniRaftClusterWithGrpc.FactoryGet {
 
   {
     getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
@@ -53,7 +53,7 @@ public class TestRaftWithGrpc
 
   @Test
   public void testRequestTimeout() throws Exception {
-    try(MiniRaftClusterWithGRpc cluster = newCluster(NUM_SERVERS)) {
+    try(MiniRaftClusterWithGrpc cluster = newCluster(NUM_SERVERS)) {
       cluster.start();
       testRequestTimeout(false, cluster, LOG);
     }
@@ -62,7 +62,7 @@ public class TestRaftWithGrpc
   @Test
   public void testUpdateViaHeartbeat() throws Exception {
     LOG.info("Running testUpdateViaHeartbeat");
-    final MiniRaftClusterWithGRpc cluster = newCluster(NUM_SERVERS);
+    final MiniRaftClusterWithGrpc cluster = newCluster(NUM_SERVERS);
     cluster.start();
     waitForLeader(cluster);
     long waitTime = 5000;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
index f577a48..30a3f0d 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
@@ -44,16 +44,16 @@ public class TestRetryCacheWithGrpc extends RetryCacheTests {
     LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
   }
 
-  private final MiniRaftClusterWithGRpc cluster;
+  private final MiniRaftClusterWithGrpc cluster;
 
   public TestRetryCacheWithGrpc() throws IOException {
-    cluster = MiniRaftClusterWithGRpc.FACTORY.newCluster(
+    cluster = MiniRaftClusterWithGrpc.FACTORY.newCluster(
         NUM_SERVERS, properties);
     Assert.assertNull(cluster.getLeader());
   }
 
   @Override
-  public MiniRaftClusterWithGRpc getCluster() {
+  public MiniRaftClusterWithGrpc getCluster() {
     return cluster;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerInformationWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerInformationWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerInformationWithGrpc.java
index ef978a1..30be724 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerInformationWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestServerInformationWithGrpc.java
@@ -20,6 +20,6 @@ package org.apache.ratis.grpc;
 import org.apache.ratis.server.impl.ServerInformationBaseTest;
 
 public class TestServerInformationWithGrpc
-    extends ServerInformationBaseTest<MiniRaftClusterWithGRpc>
-    implements MiniRaftClusterWithGRpc.FactoryGet {
+    extends ServerInformationBaseTest<MiniRaftClusterWithGrpc>
+    implements MiniRaftClusterWithGrpc.FactoryGet {
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-proto-shaded/src/main/proto/GRpc.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/GRpc.proto b/ratis-proto-shaded/src/main/proto/GRpc.proto
deleted file mode 100644
index d7e550e..0000000
--- a/ratis-proto-shaded/src/main/proto/GRpc.proto
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-syntax = "proto3";
-option java_package = "org.apache.ratis.shaded.proto.grpc";
-option java_outer_classname = "GRpcProtos";
-option java_generate_equals_and_hash = true;
-package ratis.grpc;
-
-import "Raft.proto";
-
-service RaftClientProtocolService {
-  // A client-to-server RPC to set new raft configuration
-  rpc setConfiguration(ratis.common.SetConfigurationRequestProto)
-      returns(ratis.common.RaftClientReplyProto) {}
-
-  // A client-to-server stream RPC to append data
-  rpc append(stream ratis.common.RaftClientRequestProto)
-      returns (stream ratis.common.RaftClientReplyProto) {}
-}
-
-service RaftServerProtocolService {
-  rpc requestVote(ratis.common.RequestVoteRequestProto)
-      returns(ratis.common.RequestVoteReplyProto) {}
-
-  rpc appendEntries(stream ratis.common.AppendEntriesRequestProto)
-      returns(stream ratis.common.AppendEntriesReplyProto) {}
-
-  rpc installSnapshot(stream ratis.common.InstallSnapshotRequestProto)
-      returns(ratis.common.InstallSnapshotReplyProto) {}
-}
-
-service AdminProtocolService {
-  // A client-to-server RPC to add a new group
-  rpc groupManagement(ratis.common.GroupManagementRequestProto)
-      returns(ratis.common.RaftClientReplyProto) {}
-
-  rpc serverInformation(ratis.common.ServerInformationRequestProto)
-      returns(ratis.common.ServerInformationReplyProto) {}
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-proto-shaded/src/main/proto/Grpc.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Grpc.proto b/ratis-proto-shaded/src/main/proto/Grpc.proto
new file mode 100644
index 0000000..5c4bbad
--- /dev/null
+++ b/ratis-proto-shaded/src/main/proto/Grpc.proto
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+syntax = "proto3";
+option java_package = "org.apache.ratis.shaded.proto.grpc";
+option java_outer_classname = "GrpcProtos";
+option java_generate_equals_and_hash = true;
+package ratis.grpc;
+
+import "Raft.proto";
+
+service RaftClientProtocolService {
+  // A client-to-server RPC to set new raft configuration
+  rpc setConfiguration(ratis.common.SetConfigurationRequestProto)
+      returns(ratis.common.RaftClientReplyProto) {}
+
+  // A client-to-server stream RPC to append data
+  rpc append(stream ratis.common.RaftClientRequestProto)
+      returns (stream ratis.common.RaftClientReplyProto) {}
+}
+
+service RaftServerProtocolService {
+  rpc requestVote(ratis.common.RequestVoteRequestProto)
+      returns(ratis.common.RequestVoteReplyProto) {}
+
+  rpc appendEntries(stream ratis.common.AppendEntriesRequestProto)
+      returns(stream ratis.common.AppendEntriesReplyProto) {}
+
+  rpc installSnapshot(stream ratis.common.InstallSnapshotRequestProto)
+      returns(ratis.common.InstallSnapshotReplyProto) {}
+}
+
+service AdminProtocolService {
+  // A client-to-server RPC to add a new group
+  rpc groupManagement(ratis.common.GroupManagementRequestProto)
+      returns(ratis.common.RaftClientReplyProto) {}
+
+  rpc serverInformation(ratis.common.ServerInformationRequestProto)
+      returns(ratis.common.ServerInformationReplyProto) {}
+}



[3/3] incubator-ratis git commit: RATIS-324. Rename grpc classes. Contributed by Tsz Wo Nicholas Sze.

Posted by lj...@apache.org.
RATIS-324. Rename grpc classes. Contributed by Tsz Wo Nicholas Sze.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/ed8e60da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/ed8e60da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/ed8e60da

Branch: refs/heads/master
Commit: ed8e60dad83ae733fe6c6c3752e82f1fee72c12e
Parents: 974919e
Author: Lokesh Jain <lj...@apache.org>
Authored: Thu Sep 20 10:29:48 2018 +0530
Committer: Lokesh Jain <lj...@apache.org>
Committed: Thu Sep 20 10:29:48 2018 +0530

----------------------------------------------------------------------
 .../ratis/examples/ParameterizedBaseTest.java   |   6 +-
 .../filestore/TestFileStoreAsyncWithGrpc.java   |   6 +-
 .../filestore/TestFileStoreWithGrpc.java        |   6 +-
 .../java/org/apache/ratis/grpc/GrpcFactory.java |   9 +-
 .../java/org/apache/ratis/grpc/GrpcUtil.java    | 131 ++++++
 .../org/apache/ratis/grpc/RaftGRpcService.java  | 154 -------
 .../org/apache/ratis/grpc/RaftGrpcUtil.java     | 131 ------
 .../ratis/grpc/client/AppendStreamer.java       | 390 -----------------
 .../grpc/client/GrpcClientProtocolClient.java   | 234 ++++++++++
 .../grpc/client/GrpcClientProtocolProxy.java    | 107 +++++
 .../grpc/client/GrpcClientProtocolService.java  | 195 +++++++++
 .../apache/ratis/grpc/client/GrpcClientRpc.java |  14 +-
 .../ratis/grpc/client/GrpcClientStreamer.java   | 390 +++++++++++++++++
 .../ratis/grpc/client/GrpcOutputStream.java     | 112 +++++
 .../grpc/client/RaftClientProtocolClient.java   | 234 ----------
 .../grpc/client/RaftClientProtocolProxy.java    | 107 -----
 .../grpc/client/RaftClientProtocolService.java  | 195 ---------
 .../ratis/grpc/client/RaftOutputStream.java     | 114 -----
 .../ratis/grpc/server/AdminProtocolService.java |  53 ---
 .../ratis/grpc/server/GRpcLogAppender.java      | 438 -------------------
 .../grpc/server/GrpcAdminProtocolService.java   |  53 +++
 .../ratis/grpc/server/GrpcLogAppender.java      | 437 ++++++++++++++++++
 .../grpc/server/GrpcServerProtocolClient.java   |  75 ++++
 .../grpc/server/GrpcServerProtocolService.java  | 134 ++++++
 .../apache/ratis/grpc/server/GrpcService.java   | 152 +++++++
 .../grpc/server/RaftServerProtocolClient.java   |  75 ----
 .../grpc/server/RaftServerProtocolService.java  | 134 ------
 .../ratis/grpc/MiniRaftClusterWithGRpc.java     |  72 ---
 .../ratis/grpc/MiniRaftClusterWithGrpc.java     |  72 +++
 .../ratis/grpc/TestGroupManagementWithGrpc.java |   2 +-
 .../ratis/grpc/TestLeaderElectionWithGrpc.java  |   6 +-
 .../ratis/grpc/TestRaftAsyncWithGrpc.java       |   4 +-
 .../ratis/grpc/TestRaftExceptionWithGrpc.java   |   4 +-
 .../grpc/TestRaftReconfigurationWithGRpc.java   |  36 --
 .../grpc/TestRaftReconfigurationWithGrpc.java   |  36 ++
 .../ratis/grpc/TestRaftServerWithGrpc.java      |   4 +-
 .../ratis/grpc/TestRaftSnapshotWithGrpc.java    |   5 +-
 .../TestRaftStateMachineExceptionWithGrpc.java  |   4 +-
 .../org/apache/ratis/grpc/TestRaftStream.java   |  24 +-
 .../org/apache/ratis/grpc/TestRaftWithGrpc.java |   8 +-
 .../ratis/grpc/TestRetryCacheWithGrpc.java      |   6 +-
 .../grpc/TestServerInformationWithGrpc.java     |   4 +-
 ratis-proto-shaded/src/main/proto/GRpc.proto    |  54 ---
 ratis-proto-shaded/src/main/proto/Grpc.proto    |  54 +++
 44 files changed, 2237 insertions(+), 2244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java b/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java
index 057c73a..03f60ec 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java
@@ -21,7 +21,7 @@ import org.apache.ratis.BaseTest;
 import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.grpc.MiniRaftClusterWithGRpc;
+import org.apache.ratis.grpc.MiniRaftClusterWithGrpc;
 import org.apache.ratis.hadooprpc.MiniRaftClusterWithHadoopRpc;
 import org.apache.ratis.netty.MiniRaftClusterWithNetty;
 import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
@@ -101,8 +101,8 @@ public abstract class ParameterizedBaseTest extends BaseTest {
     if (isAll || classes.contains(MiniRaftClusterWithSimulatedRpc.class)) {
       add(clusters, MiniRaftClusterWithSimulatedRpc.FACTORY, ids.next(), prop);
     }
-    if (isAll || classes.contains(MiniRaftClusterWithGRpc.class)) {
-      add(clusters, MiniRaftClusterWithGRpc.FACTORY, ids.next(), prop);
+    if (isAll || classes.contains(MiniRaftClusterWithGrpc.class)) {
+      add(clusters, MiniRaftClusterWithGrpc.FACTORY, ids.next(), prop);
     }
     if (isAll || classes.contains(MiniRaftClusterWithNetty.class)) {
       add(clusters, MiniRaftClusterWithNetty.FACTORY, ids.next(), prop);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreAsyncWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreAsyncWithGrpc.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreAsyncWithGrpc.java
index 02bd2b0..d25c85e 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreAsyncWithGrpc.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreAsyncWithGrpc.java
@@ -17,9 +17,9 @@
  */
 package org.apache.ratis.examples.filestore;
 
-import org.apache.ratis.grpc.MiniRaftClusterWithGRpc;
+import org.apache.ratis.grpc.MiniRaftClusterWithGrpc;
 
 public class TestFileStoreAsyncWithGrpc
-    extends FileStoreAsyncBaseTest<MiniRaftClusterWithGRpc>
-    implements MiniRaftClusterWithGRpc.FactoryGet {
+    extends FileStoreAsyncBaseTest<MiniRaftClusterWithGrpc>
+    implements MiniRaftClusterWithGrpc.FactoryGet {
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java
index 71ae294..6e46b6e 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/TestFileStoreWithGrpc.java
@@ -17,9 +17,9 @@
  */
 package org.apache.ratis.examples.filestore;
 
-import org.apache.ratis.grpc.MiniRaftClusterWithGRpc;
+import org.apache.ratis.grpc.MiniRaftClusterWithGrpc;
 
 public class TestFileStoreWithGrpc
-    extends FileStoreBaseTest<MiniRaftClusterWithGRpc>
-    implements MiniRaftClusterWithGRpc.FactoryGet {
+    extends FileStoreBaseTest<MiniRaftClusterWithGrpc>
+    implements MiniRaftClusterWithGrpc.FactoryGet {
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
index 4f2612e..836ee1c 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
@@ -21,7 +21,8 @@ import org.apache.ratis.client.ClientFactory;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.client.GrpcClientRpc;
-import org.apache.ratis.grpc.server.GRpcLogAppender;
+import org.apache.ratis.grpc.server.GrpcLogAppender;
+import org.apache.ratis.grpc.server.GrpcService;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.RaftServer;
@@ -38,12 +39,12 @@ public class GrpcFactory implements ServerFactory, ClientFactory {
   @Override
   public LogAppender newLogAppender(RaftServerImpl server, LeaderState state,
                                     FollowerInfo f) {
-    return new GRpcLogAppender(server, state, f);
+    return new GrpcLogAppender(server, state, f);
   }
 
   @Override
-  public RaftGRpcService newRaftServerRpc(RaftServer server) {
-    return RaftGRpcService.newBuilder()
+  public GrpcService newRaftServerRpc(RaftServer server) {
+    return GrpcService.newBuilder()
         .setServer(server)
         .build();
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
new file mode 100644
index 0000000..84f01c8
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.ServerNotReadyException;
+import org.apache.ratis.shaded.io.grpc.Metadata;
+import org.apache.ratis.shaded.io.grpc.Status;
+import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.util.*;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+public interface GrpcUtil {
+  Metadata.Key<String> EXCEPTION_TYPE_KEY =
+      Metadata.Key.of("exception-type", Metadata.ASCII_STRING_MARSHALLER);
+  Metadata.Key<String> CALL_ID =
+      Metadata.Key.of("call-id", Metadata.ASCII_STRING_MARSHALLER);
+
+  static StatusRuntimeException wrapException(Throwable t) {
+    return wrapException(t, -1);
+  }
+
+  static StatusRuntimeException wrapException(Throwable t, long callId) {
+    t = JavaUtils.unwrapCompletionException(t);
+
+    Metadata trailers = new Metadata();
+    trailers.put(EXCEPTION_TYPE_KEY, t.getClass().getCanonicalName());
+    if (callId > 0) {
+      trailers.put(CALL_ID, String.valueOf(callId));
+    }
+    return new StatusRuntimeException(
+        Status.INTERNAL.withCause(t).withDescription(t.getMessage()), trailers);
+  }
+
+  static Throwable unwrapThrowable(Throwable t) {
+    if (t instanceof StatusRuntimeException) {
+      final IOException ioe = tryUnwrapException((StatusRuntimeException)t);
+      if (ioe != null) {
+        return ioe;
+      }
+    }
+    return t;
+  }
+
+  static IOException unwrapException(StatusRuntimeException se) {
+    final IOException ioe = tryUnwrapException(se);
+    return ioe != null? ioe: new IOException(se);
+  }
+
+  static IOException tryUnwrapException(StatusRuntimeException se) {
+    final Metadata trailers = se.getTrailers();
+    final Status status = se.getStatus();
+    if (trailers != null && status != null) {
+      final String className = trailers.get(EXCEPTION_TYPE_KEY);
+      if (className != null) {
+        try {
+          Class<?> clazz = Class.forName(className);
+          final Exception unwrapped = ReflectionUtils.instantiateException(
+              clazz.asSubclass(Exception.class), status.getDescription(), se);
+          return IOUtils.asIOException(unwrapped);
+        } catch (Exception e) {
+          se.addSuppressed(e);
+          return new IOException(se);
+        }
+      }
+    }
+    return null;
+  }
+
+  static long getCallId(Throwable t) {
+    if (t instanceof StatusRuntimeException) {
+      final Metadata trailers = ((StatusRuntimeException)t).getTrailers();
+      String callId = trailers.get(CALL_ID);
+      return callId != null ? Integer.parseInt(callId) : -1;
+    }
+    return -1;
+  }
+
+  static IOException unwrapIOException(Throwable t) {
+    final IOException e;
+    if (t instanceof StatusRuntimeException) {
+      e = GrpcUtil.unwrapException((StatusRuntimeException) t);
+    } else {
+      e = IOUtils.asIOException(t);
+    }
+    return e;
+  }
+
+  static <REPLY extends RaftClientReply, REPLY_PROTO> void asyncCall(
+      StreamObserver<REPLY_PROTO> responseObserver,
+      CheckedSupplier<CompletableFuture<REPLY>, IOException> supplier,
+      Function<REPLY, REPLY_PROTO> toProto) {
+    try {
+      supplier.get().whenCompleteAsync((reply, exception) -> {
+        if (exception != null) {
+          responseObserver.onError(GrpcUtil.wrapException(exception));
+        } else {
+          responseObserver.onNext(toProto.apply(reply));
+          responseObserver.onCompleted();
+        }
+      });
+    } catch (Exception e) {
+      responseObserver.onError(GrpcUtil.wrapException(e));
+    }
+  }
+
+  static void warn(Logger log, Supplier<String> message, Throwable t) {
+    LogUtils.warn(log, message, unwrapThrowable(t), StatusRuntimeException.class, ServerNotReadyException.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
deleted file mode 100644
index d638a45..0000000
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc;
-
-import org.apache.ratis.grpc.client.RaftClientProtocolService;
-import org.apache.ratis.grpc.server.AdminProtocolService;
-import org.apache.ratis.grpc.server.RaftServerProtocolClient;
-import org.apache.ratis.grpc.server.RaftServerProtocolService;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.RaftServerRpc;
-import org.apache.ratis.server.impl.RaftServerRpcWithProxy;
-import org.apache.ratis.shaded.io.grpc.Server;
-import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder;
-import org.apache.ratis.shaded.proto.RaftProtos.*;
-import org.apache.ratis.util.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.function.Supplier;
-
-/** A grpc implementation of {@link RaftServerRpc}. */
-public class RaftGRpcService extends RaftServerRpcWithProxy<RaftServerProtocolClient, PeerProxyMap<RaftServerProtocolClient>> {
-  static final Logger LOG = LoggerFactory.getLogger(RaftGRpcService.class);
-  public static final String GRPC_SEND_SERVER_REQUEST =
-      RaftGRpcService.class.getSimpleName() + ".sendRequest";
-
-  public static class Builder extends RaftServerRpc.Builder<Builder,RaftGRpcService> {
-    private Builder() {}
-
-    @Override
-    public Builder getThis() {
-      return this;
-    }
-
-    @Override
-    public RaftGRpcService build() {
-      return new RaftGRpcService(getServer());
-    }
-  }
-
-  public static Builder newBuilder() {
-    return new Builder();
-  }
-
-  private final Server server;
-  private final Supplier<InetSocketAddress> addressSupplier;
-
-  private RaftGRpcService(RaftServer server) {
-    this(server, server::getId,
-        GrpcConfigKeys.Server.port(server.getProperties()),
-        GrpcConfigKeys.messageSizeMax(server.getProperties(), LOG::info),
-        RaftServerConfigKeys.Log.Appender.bufferCapacity(server.getProperties()),
-        GrpcConfigKeys.flowControlWindow(server.getProperties(), LOG::info),
-        RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties()));
-  }
-  private RaftGRpcService(RaftServer raftServer, Supplier<RaftPeerId> idSupplier, int port,
-      SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize,
-      SizeInBytes flowControlWindow, TimeDuration requestTimeoutDuration) {
-    super(idSupplier, id -> new PeerProxyMap<>(id.toString(),
-        p -> new RaftServerProtocolClient(p, flowControlWindow.getSizeInt(), requestTimeoutDuration)));
-    if (appenderBufferSize.getSize() > grpcMessageSizeMax.getSize()) {
-      throw new IllegalArgumentException("Illegal configuration: "
-          + RaftServerConfigKeys.Log.Appender.BUFFER_CAPACITY_KEY + " = " + appenderBufferSize
-          + " > " + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + " = " + grpcMessageSizeMax);
-    }
-
-    server = NettyServerBuilder.forPort(port)
-        .maxInboundMessageSize(grpcMessageSizeMax.getSizeInt())
-        .flowControlWindow(flowControlWindow.getSizeInt())
-        .addService(new RaftServerProtocolService(idSupplier, raftServer))
-        .addService(new RaftClientProtocolService(idSupplier, raftServer))
-        .addService(new AdminProtocolService(raftServer))
-        .build();
-    addressSupplier = JavaUtils.memoize(() -> new InetSocketAddress(port != 0? port: server.getPort()));
-  }
-
-  @Override
-  public SupportedRpcType getRpcType() {
-    return SupportedRpcType.GRPC;
-  }
-
-  @Override
-  public void startImpl() {
-    try {
-      server.start();
-    } catch (IOException e) {
-      ExitUtils.terminate(1, "Failed to start Grpc server", e, LOG);
-    }
-    LOG.info("{}: {} started, listening on {}", getId(), getClass().getSimpleName(), getInetSocketAddress());
-  }
-
-  @Override
-  public void closeImpl() throws IOException {
-    final String name = getId() + ": shutdown server with port " + server.getPort();
-    LOG.info("{} now", name);
-    final Server s = server.shutdownNow();
-    super.closeImpl();
-    try {
-      s.awaitTermination();
-    } catch(InterruptedException e) {
-      throw IOUtils.toInterruptedIOException(name + " failed", e);
-    }
-    LOG.info("{} successfully", name);
-  }
-
-  @Override
-  public InetSocketAddress getInetSocketAddress() {
-    return addressSupplier.get();
-  }
-
-  @Override
-  public AppendEntriesReplyProto appendEntries(
-      AppendEntriesRequestProto request) throws IOException {
-    throw new UnsupportedOperationException(
-        "Blocking AppendEntries call is not supported");
-  }
-
-  @Override
-  public InstallSnapshotReplyProto installSnapshot(
-      InstallSnapshotRequestProto request) throws IOException {
-    throw new UnsupportedOperationException(
-        "Blocking InstallSnapshot call is not supported");
-  }
-
-  @Override
-  public RequestVoteReplyProto requestVote(RequestVoteRequestProto request)
-      throws IOException {
-    CodeInjectionForTesting.execute(GRPC_SEND_SERVER_REQUEST, getId(),
-        null, request);
-
-    final RaftPeerId target = RaftPeerId.valueOf(request.getServerRequest().getReplyId());
-    return getProxies().getProxy(target).requestVote(request);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
deleted file mode 100644
index ecbbf44..0000000
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc;
-
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.ServerNotReadyException;
-import org.apache.ratis.shaded.io.grpc.Metadata;
-import org.apache.ratis.shaded.io.grpc.Status;
-import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
-import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
-import org.apache.ratis.util.*;
-import org.slf4j.Logger;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
-import java.util.function.Supplier;
-
-public interface RaftGrpcUtil {
-  Metadata.Key<String> EXCEPTION_TYPE_KEY =
-      Metadata.Key.of("exception-type", Metadata.ASCII_STRING_MARSHALLER);
-  Metadata.Key<String> CALL_ID =
-      Metadata.Key.of("call-id", Metadata.ASCII_STRING_MARSHALLER);
-
-  static StatusRuntimeException wrapException(Throwable t) {
-    return wrapException(t, -1);
-  }
-
-  static StatusRuntimeException wrapException(Throwable t, long callId) {
-    t = JavaUtils.unwrapCompletionException(t);
-
-    Metadata trailers = new Metadata();
-    trailers.put(EXCEPTION_TYPE_KEY, t.getClass().getCanonicalName());
-    if (callId > 0) {
-      trailers.put(CALL_ID, String.valueOf(callId));
-    }
-    return new StatusRuntimeException(
-        Status.INTERNAL.withCause(t).withDescription(t.getMessage()), trailers);
-  }
-
-  static Throwable unwrapThrowable(Throwable t) {
-    if (t instanceof StatusRuntimeException) {
-      final IOException ioe = tryUnwrapException((StatusRuntimeException)t);
-      if (ioe != null) {
-        return ioe;
-      }
-    }
-    return t;
-  }
-
-  static IOException unwrapException(StatusRuntimeException se) {
-    final IOException ioe = tryUnwrapException(se);
-    return ioe != null? ioe: new IOException(se);
-  }
-
-  static IOException tryUnwrapException(StatusRuntimeException se) {
-    final Metadata trailers = se.getTrailers();
-    final Status status = se.getStatus();
-    if (trailers != null && status != null) {
-      final String className = trailers.get(EXCEPTION_TYPE_KEY);
-      if (className != null) {
-        try {
-          Class<?> clazz = Class.forName(className);
-          final Exception unwrapped = ReflectionUtils.instantiateException(
-              clazz.asSubclass(Exception.class), status.getDescription(), se);
-          return IOUtils.asIOException(unwrapped);
-        } catch (Exception e) {
-          se.addSuppressed(e);
-          return new IOException(se);
-        }
-      }
-    }
-    return null;
-  }
-
-  static long getCallId(Throwable t) {
-    if (t instanceof StatusRuntimeException) {
-      final Metadata trailers = ((StatusRuntimeException)t).getTrailers();
-      String callId = trailers.get(CALL_ID);
-      return callId != null ? Integer.parseInt(callId) : -1;
-    }
-    return -1;
-  }
-
-  static IOException unwrapIOException(Throwable t) {
-    final IOException e;
-    if (t instanceof StatusRuntimeException) {
-      e = RaftGrpcUtil.unwrapException((StatusRuntimeException) t);
-    } else {
-      e = IOUtils.asIOException(t);
-    }
-    return e;
-  }
-
-  static <REPLY extends RaftClientReply, REPLY_PROTO> void asyncCall(
-      StreamObserver<REPLY_PROTO> responseObserver,
-      CheckedSupplier<CompletableFuture<REPLY>, IOException> supplier,
-      Function<REPLY, REPLY_PROTO> toProto) {
-    try {
-      supplier.get().whenCompleteAsync((reply, exception) -> {
-        if (exception != null) {
-          responseObserver.onError(RaftGrpcUtil.wrapException(exception));
-        } else {
-          responseObserver.onNext(toProto.apply(reply));
-          responseObserver.onCompleted();
-        }
-      });
-    } catch (Exception e) {
-      responseObserver.onError(RaftGrpcUtil.wrapException(e));
-    }
-  }
-
-  static void warn(Logger log, Supplier<String> message, Throwable t) {
-    LogUtils.warn(log, message, unwrapThrowable(t), StatusRuntimeException.class, ServerNotReadyException.class);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
deleted file mode 100644
index 3068751..0000000
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
+++ /dev/null
@@ -1,390 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc.client;
-
-import org.apache.ratis.client.impl.ClientProtoUtils;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.grpc.GrpcConfigKeys;
-import org.apache.ratis.grpc.RaftGrpcUtil;
-import org.apache.ratis.protocol.*;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcRequestProto;
-import org.apache.ratis.util.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-public class AppendStreamer implements Closeable {
-  public static final Logger LOG = LoggerFactory.getLogger(AppendStreamer.class);
-
-  enum RunningState {RUNNING, LOOK_FOR_LEADER, CLOSED, ERROR}
-
-  private static class ExceptionAndRetry {
-    private final Map<RaftPeerId, IOException> exceptionMap = new HashMap<>();
-    private final AtomicInteger retryTimes = new AtomicInteger(0);
-    private final int maxRetryTimes;
-    private final TimeDuration retryInterval;
-
-    ExceptionAndRetry(RaftProperties prop) {
-      maxRetryTimes = GrpcConfigKeys.OutputStream.retryTimes(prop);
-      retryInterval = GrpcConfigKeys.OutputStream.retryInterval(prop);
-    }
-
-    void addException(RaftPeerId peer, IOException e) {
-      exceptionMap.put(peer, e);
-      retryTimes.incrementAndGet();
-    }
-
-    IOException getCombinedException() {
-      return new IOException("Exceptions: " + exceptionMap);
-    }
-
-    boolean shouldRetry() {
-      return retryTimes.get() <= maxRetryTimes;
-    }
-  }
-
-  private final Deque<RaftClientRequestProto> dataQueue;
-  private final Deque<RaftClientRequestProto> ackQueue;
-  private final int maxPendingNum;
-  private final SizeInBytes maxMessageSize;
-
-  private final PeerProxyMap<RaftClientProtocolProxy> proxyMap;
-  private final Map<RaftPeerId, RaftPeer> peers;
-  private RaftPeerId leaderId;
-  private volatile RaftClientProtocolProxy leaderProxy;
-  private final ClientId clientId;
-
-  private volatile RunningState running = RunningState.RUNNING;
-  private final ExceptionAndRetry exceptionAndRetry;
-  private final Sender senderThread;
-  private final RaftGroupId groupId;
-
-  AppendStreamer(RaftProperties prop, RaftGroup group,
-      RaftPeerId leaderId, ClientId clientId) {
-    this.clientId = clientId;
-    maxPendingNum = GrpcConfigKeys.OutputStream.outstandingAppendsMax(prop);
-    maxMessageSize = GrpcConfigKeys.messageSizeMax(prop, LOG::debug);
-    dataQueue = new ConcurrentLinkedDeque<>();
-    ackQueue = new ConcurrentLinkedDeque<>();
-    exceptionAndRetry = new ExceptionAndRetry(prop);
-
-    this.groupId = group.getGroupId();
-    this.peers = group.getPeers().stream().collect(
-        Collectors.toMap(RaftPeer::getId, Function.identity()));
-    proxyMap = new PeerProxyMap<>(clientId.toString(),
-        raftPeer -> new RaftClientProtocolProxy(clientId, raftPeer, ResponseHandler::new,
-            prop));
-    proxyMap.addPeers(group.getPeers());
-    refreshLeaderProxy(leaderId, null);
-
-    senderThread = new Sender();
-    senderThread.setName(this.toString() + "-sender");
-    senderThread.start();
-  }
-
-  private synchronized void refreshLeaderProxy(RaftPeerId suggested,
-      RaftPeerId oldLeader) {
-    if (suggested != null) {
-      leaderId = suggested;
-    } else {
-      if (oldLeader == null) {
-        leaderId = peers.keySet().iterator().next();
-      } else {
-        leaderId = CollectionUtils.random(oldLeader, peers.keySet());
-        if (leaderId == null) {
-          leaderId = oldLeader;
-        }
-      }
-    }
-    LOG.debug("{} switches leader from {} to {}. suggested leader: {}", this,
-          oldLeader, leaderId, suggested);
-    if (leaderProxy != null) {
-      leaderProxy.closeCurrentSession();
-    }
-    try {
-      leaderProxy = proxyMap.getProxy(leaderId);
-    } catch (IOException e) {
-      LOG.error("Should not hit IOException here", e);
-      refreshLeader(null, leaderId);
-    }
-  }
-
-  private boolean isRunning() {
-    return running == RunningState.RUNNING ||
-        running == RunningState.LOOK_FOR_LEADER;
-  }
-
-  private void checkState() throws IOException {
-    if (!isRunning()) {
-      throwException("The AppendStreamer has been closed");
-    }
-  }
-
-  synchronized void write(ByteString content, long seqNum)
-      throws IOException {
-    checkState();
-    while (isRunning() && dataQueue.size() >= maxPendingNum) {
-      try {
-        wait();
-      } catch (InterruptedException ignored) {
-      }
-    }
-    if (isRunning()) {
-      // wrap the current buffer into a RaftClientRequestProto
-      final RaftClientRequestProto request = ClientProtoUtils.toRaftClientRequestProto(
-          clientId, leaderId, groupId, seqNum, seqNum, content);
-      if (request.getSerializedSize() > maxMessageSize.getSizeInt()) {
-        throw new IOException("msg size:" + request.getSerializedSize() +
-            " exceeds maximum:" + maxMessageSize.getSizeInt());
-      }
-      dataQueue.offer(request);
-      this.notifyAll();
-    } else {
-      throwException(this + " got closed.");
-    }
-  }
-
-  synchronized void flush() throws IOException {
-    checkState();
-    if (dataQueue.isEmpty() && ackQueue.isEmpty()) {
-      return;
-    }
-    // wait for the pending Q to become empty
-    while (isRunning() && (!dataQueue.isEmpty() || !ackQueue.isEmpty())) {
-      try {
-        wait();
-      } catch (InterruptedException ignored) {
-      }
-    }
-    if (!isRunning() && (!dataQueue.isEmpty() || !ackQueue.isEmpty())) {
-      throwException(this + " got closed before finishing flush");
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (!isRunning()) {
-      return;
-    }
-    flush();
-
-    running = RunningState.CLOSED;
-    senderThread.interrupt();
-    try {
-      senderThread.join();
-    } catch (InterruptedException ignored) {
-    }
-    proxyMap.close();
-  }
-
-  @Override
-  public String toString() {
-    return this.getClass().getSimpleName() + "-" + clientId;
-  }
-
-  private class Sender extends Daemon {
-    @Override
-    public void run() {
-      while (isRunning()) {
-
-        synchronized (AppendStreamer.this) {
-          while (isRunning() && shouldWait()) {
-            try {
-              AppendStreamer.this.wait();
-            } catch (InterruptedException ignored) {
-            }
-          }
-          if (running == RunningState.RUNNING) {
-            Preconditions.assertTrue(!dataQueue.isEmpty(), "dataQueue is empty");
-            RaftClientRequestProto next = dataQueue.poll();
-            leaderProxy.onNext(next);
-            ackQueue.offer(next);
-          }
-        }
-      }
-    }
-
-    private boolean shouldWait() {
-      // the sender should wait if any of the following is true
-      // 1) there is no data to send
-      // 2) there are too many outstanding pending requests
-      // 3) Error/NotLeaderException just happened, we're still waiting for
-      //    the first response to confirm the new leader
-      return dataQueue.isEmpty() || ackQueue.size() >= maxPendingNum ||
-          running == RunningState.LOOK_FOR_LEADER;
-    }
-  }
-
-  /** the response handler for stream RPC */
-  private class ResponseHandler implements
-      RaftClientProtocolProxy.CloseableStreamObserver {
-    private final RaftPeerId targetId;
-    // once handled the first NotLeaderException or Error, the handler should
-    // be inactive and should not make any further action.
-    private volatile boolean active = true;
-
-    ResponseHandler(RaftPeer target) {
-      targetId = target.getId();
-    }
-
-    @Override
-    public String toString() {
-      return AppendStreamer.this + "-ResponseHandler-" + targetId;
-    }
-
-    @Override
-    public void onNext(RaftClientReplyProto reply) {
-      if (!active) {
-        return;
-      }
-      synchronized (AppendStreamer.this) {
-        RaftClientRequestProto pending = Objects.requireNonNull(ackQueue.peek());
-        if (reply.getRpcReply().getSuccess()) {
-          Preconditions.assertTrue(pending.getRpcRequest().getCallId() == reply.getRpcReply().getCallId(),
-              () -> "pending=" + ClientProtoUtils.toString(pending) + " but reply=" + ClientProtoUtils.toString(reply));
-          ackQueue.poll();
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("{} received success ack for {}", this, ClientProtoUtils.toString(pending));
-          }
-          // we've identified the correct leader
-          if (running == RunningState.LOOK_FOR_LEADER) {
-            running = RunningState.RUNNING;
-          }
-        } else {
-          // this may be a NotLeaderException
-          RaftClientReply r = ClientProtoUtils.toRaftClientReply(reply);
-          final NotLeaderException nle = r.getNotLeaderException();
-          if (nle != null) {
-            LOG.debug("{} received a NotLeaderException from {}", this,
-                r.getServerId());
-            handleNotLeader(nle, targetId);
-          }
-        }
-        AppendStreamer.this.notifyAll();
-      }
-    }
-
-    @Override
-    public void onError(Throwable t) {
-      LOG.warn(this + " onError", t);
-      if (active) {
-        synchronized (AppendStreamer.this) {
-          handleError(t, this);
-          AppendStreamer.this.notifyAll();
-        }
-      }
-    }
-
-    @Override
-    public void onCompleted() {
-      LOG.info("{} onCompleted, pending requests #: {}", this,
-          ackQueue.size());
-    }
-
-    @Override // called by handleError and handleNotLeader
-    public void close() throws IOException {
-      active = false;
-    }
-  }
-
-  private void throwException(String msg) throws IOException {
-    if (running == RunningState.ERROR) {
-      throw exceptionAndRetry.getCombinedException();
-    } else {
-      throw new IOException(msg);
-    }
-  }
-
-  private void handleNotLeader(NotLeaderException nle,
-      RaftPeerId oldLeader) {
-    Preconditions.assertTrue(Thread.holdsLock(AppendStreamer.this));
-    // handle NotLeaderException: refresh leader and RaftConfiguration
-    refreshPeers(nle.getPeers());
-
-    refreshLeader(nle.getSuggestedLeader().getId(), oldLeader);
-  }
-
-  private void handleError(Throwable t, ResponseHandler handler) {
-    Preconditions.assertTrue(Thread.holdsLock(AppendStreamer.this));
-    final IOException e = RaftGrpcUtil.unwrapIOException(t);
-
-    exceptionAndRetry.addException(handler.targetId, e);
-    LOG.debug("{} got error: {}. Total retry times {}, max retry times {}.",
-        handler, e, exceptionAndRetry.retryTimes.get(),
-        exceptionAndRetry.maxRetryTimes);
-
-    leaderProxy.onError();
-    if (exceptionAndRetry.shouldRetry()) {
-      refreshLeader(null, leaderId);
-    } else {
-      running = RunningState.ERROR;
-    }
-  }
-
-  private void refreshLeader(RaftPeerId suggestedLeader, RaftPeerId oldLeader) {
-    running = RunningState.LOOK_FOR_LEADER;
-    refreshLeaderProxy(suggestedLeader, oldLeader);
-    reQueuePendingRequests(leaderId);
-
-    final RaftClientRequestProto request = Objects.requireNonNull(
-        dataQueue.poll());
-    ackQueue.offer(request);
-    try {
-      exceptionAndRetry.retryInterval.sleep();
-    } catch (InterruptedException ignored) {
-    }
-    leaderProxy.onNext(request);
-  }
-
-  private void reQueuePendingRequests(RaftPeerId newLeader) {
-    if (isRunning()) {
-      // resend all the pending requests
-      while (!ackQueue.isEmpty()) {
-        final RaftClientRequestProto oldRequest = ackQueue.pollLast();
-        final RaftRpcRequestProto.Builder newRpc = RaftRpcRequestProto.newBuilder(oldRequest.getRpcRequest())
-            .setReplyId(newLeader.toByteString());
-        final RaftClientRequestProto newRequest = RaftClientRequestProto.newBuilder(oldRequest)
-            .setRpcRequest(newRpc).build();
-        dataQueue.offerFirst(newRequest);
-      }
-    }
-  }
-
-  private void refreshPeers(RaftPeer[] newPeers) {
-    if (newPeers != null && newPeers.length > 0) {
-      // we only add new peers, we do not remove any peer even if it no longer
-      // belongs to the current raft conf
-      Arrays.stream(newPeers).forEach(peer -> {
-        peers.putIfAbsent(peer.getId(), peer);
-        proxyMap.computeIfAbsent(peer);
-      });
-
-      LOG.debug("refreshed peers: {}", peers);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
new file mode 100644
index 0000000..a2e53bf
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.client;
+
+import org.apache.ratis.client.RaftClientConfigKeys;
+import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcUtil;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.util.TimeoutScheduler;
+import org.apache.ratis.shaded.io.grpc.ManagedChannel;
+import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
+import org.apache.ratis.shaded.io.grpc.netty.NegotiationType;
+import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.shaded.proto.RaftProtos.*;
+import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc;
+import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc.AdminProtocolServiceBlockingStub;
+import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc;
+import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceBlockingStub;
+import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceStub;
+import org.apache.ratis.util.CheckedSupplier;
+import org.apache.ratis.util.CollectionUtils;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+public class GrpcClientProtocolClient implements Closeable {
+  public static final Logger LOG = LoggerFactory.getLogger(GrpcClientProtocolClient.class);
+
+  private final Supplier<String> name;
+  private final RaftPeer target;
+  private final ManagedChannel channel;
+
+  private final TimeDuration requestTimeoutDuration;
+  private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(1);
+
+  private final RaftClientProtocolServiceBlockingStub blockingStub;
+  private final RaftClientProtocolServiceStub asyncStub;
+  private final AdminProtocolServiceBlockingStub adminBlockingStub;
+
+  private final AtomicReference<AsyncStreamObservers> appendStreamObservers = new AtomicReference<>();
+
+  public GrpcClientProtocolClient(ClientId id, RaftPeer target, RaftProperties properties) {
+    this.name = JavaUtils.memoize(() -> id + "->" + target.getId());
+    this.target = target;
+
+    final SizeInBytes flowControlWindow = GrpcConfigKeys.flowControlWindow(properties, LOG::debug);
+    final SizeInBytes maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug);
+    channel = NettyChannelBuilder.forTarget(target.getAddress())
+        .negotiationType(NegotiationType.PLAINTEXT)
+        .flowControlWindow(flowControlWindow.getSizeInt())
+        .maxInboundMessageSize(maxMessageSize.getSizeInt())
+        .build();
+    blockingStub = RaftClientProtocolServiceGrpc.newBlockingStub(channel);
+    asyncStub = RaftClientProtocolServiceGrpc.newStub(channel);
+    adminBlockingStub = AdminProtocolServiceGrpc.newBlockingStub(channel);
+    this.requestTimeoutDuration = RaftClientConfigKeys.Rpc.requestTimeout(properties);
+  }
+
+  String getName() {
+    return name.get();
+  }
+
+  @Override
+  public void close() {
+    final AsyncStreamObservers observers = appendStreamObservers.get();
+    if (observers != null) {
+      observers.close();
+    }
+    channel.shutdownNow();
+  }
+
+  RaftClientReplyProto groupAdd(GroupManagementRequestProto request) throws IOException {
+    return blockingCall(() -> adminBlockingStub
+        .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
+        .groupManagement(request));
+  }
+
+  ServerInformationReplyProto serverInformation(ServerInformationRequestProto request) {
+    return adminBlockingStub
+        .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
+        .serverInformation(request);
+  }
+
+  RaftClientReplyProto setConfiguration(
+      SetConfigurationRequestProto request) throws IOException {
+    return blockingCall(() -> blockingStub
+        .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
+        .setConfiguration(request));
+  }
+
+  private static RaftClientReplyProto blockingCall(
+      CheckedSupplier<RaftClientReplyProto, StatusRuntimeException> supplier
+      ) throws IOException {
+    try {
+      return supplier.get();
+    } catch (StatusRuntimeException e) {
+      throw GrpcUtil.unwrapException(e);
+    }
+  }
+
+  StreamObserver<RaftClientRequestProto> append(
+      StreamObserver<RaftClientReplyProto> responseHandler) {
+    return asyncStub.append(responseHandler);
+  }
+
+  StreamObserver<RaftClientRequestProto> appendWithTimeout(
+      StreamObserver<RaftClientReplyProto> responseHandler) {
+    return asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
+        .append(responseHandler);
+  }
+
+  AsyncStreamObservers getAppendStreamObservers() {
+    return appendStreamObservers.updateAndGet(a -> a != null? a : new AsyncStreamObservers());
+  }
+
+  public RaftPeer getTarget() {
+    return target;
+  }
+
+  class AsyncStreamObservers implements Closeable {
+    /** Request map: callId -> future */
+    private final AtomicReference<Map<Long, CompletableFuture<RaftClientReply>>> replies = new AtomicReference<>(new ConcurrentHashMap<>());
+    private final StreamObserver<RaftClientReplyProto> replyStreamObserver = new StreamObserver<RaftClientReplyProto>() {
+      @Override
+      public void onNext(RaftClientReplyProto proto) {
+        final long callId = proto.getRpcReply().getCallId();
+        try {
+          final RaftClientReply reply = ClientProtoUtils.toRaftClientReply(proto);
+          final NotLeaderException nle = reply.getNotLeaderException();
+          if (nle != null) {
+            completeReplyExceptionally(nle, NotLeaderException.class.getName());
+            return;
+          }
+          handleReplyFuture(callId, f -> f.complete(reply));
+        } catch (Throwable t) {
+          handleReplyFuture(callId, f -> f.completeExceptionally(t));
+        }
+      }
+
+      @Override
+      public void onError(Throwable t) {
+        final IOException ioe = GrpcUtil.unwrapIOException(t);
+        completeReplyExceptionally(ioe, "onError");
+      }
+
+      @Override
+      public void onCompleted() {
+        completeReplyExceptionally(null, "completed");
+      }
+    };
+    private final StreamObserver<RaftClientRequestProto> requestStreamObserver = append(replyStreamObserver);
+
+    CompletableFuture<RaftClientReply> onNext(RaftClientRequest request) {
+      final Map<Long, CompletableFuture<RaftClientReply>> map = replies.get();
+      if (map == null) {
+        return JavaUtils.completeExceptionally(new IOException("Already closed."));
+      }
+      final CompletableFuture<RaftClientReply> f = new CompletableFuture<>();
+      CollectionUtils.putNew(request.getCallId(), f, map,
+          () -> getName() + ":" + getClass().getSimpleName());
+      try {
+        requestStreamObserver.onNext(ClientProtoUtils.toRaftClientRequestProto(request));
+        scheduler.onTimeout(requestTimeoutDuration, () -> timeoutCheck(request), LOG,
+            () -> "Timeout check failed for client request: " + request);
+      } catch(Throwable t) {
+        handleReplyFuture(request.getCallId(), future -> future.completeExceptionally(t));
+      }
+      return f;
+    }
+
+    private void timeoutCheck(RaftClientRequest request) {
+      handleReplyFuture(request.getCallId(), f -> f.completeExceptionally(
+          new IOException("Request timeout " + requestTimeoutDuration + ": " + request)));
+    }
+
+    private void handleReplyFuture(long callId, Consumer<CompletableFuture<RaftClientReply>> handler) {
+      Optional.ofNullable(replies.get())
+          .map(replyMap -> replyMap.remove(callId))
+          .ifPresent(handler);
+    }
+
+    @Override
+    public void close() {
+      requestStreamObserver.onCompleted();
+      completeReplyExceptionally(null, "close");
+    }
+
+    private void completeReplyExceptionally(Throwable t, String event) {
+      appendStreamObservers.compareAndSet(this, null);
+      final Map<Long, CompletableFuture<RaftClientReply>> map = replies.getAndSet(null);
+      if (map == null) {
+        return;
+      }
+      for (Map.Entry<Long, CompletableFuture<RaftClientReply>> entry : map.entrySet()) {
+        final CompletableFuture<RaftClientReply> f = entry.getValue();
+        if (!f.isDone()) {
+          f.completeExceptionally(t != null? t
+              : new IOException(getName() + ": Stream " + event
+                  + ": no reply for async request cid=" + entry.getKey()));
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java
new file mode 100644
index 0000000..156e6c3
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.client;
+
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.protocol.RaftPeer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.function.Function;
+
+public class GrpcClientProtocolProxy implements Closeable {
+  private final GrpcClientProtocolClient proxy;
+  private final Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation;
+  private RpcSession currentSession;
+
+  public GrpcClientProtocolProxy(ClientId clientId, RaftPeer target,
+      Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation,
+      RaftProperties properties) {
+    proxy = new GrpcClientProtocolClient(clientId, target, properties);
+    this.responseHandlerCreation = responseHandlerCreation;
+  }
+
+  @Override
+  public void close() throws IOException {
+    closeCurrentSession();
+    proxy.close();
+  }
+
+  @Override
+  public String toString() {
+    return "ProxyTo:" + proxy.getTarget();
+  }
+
+  public void closeCurrentSession() {
+    if (currentSession != null) {
+      currentSession.close();
+      currentSession = null;
+    }
+  }
+
+  public void onNext(RaftClientRequestProto request) {
+    if (currentSession == null) {
+      currentSession = new RpcSession(
+          responseHandlerCreation.apply(proxy.getTarget()));
+    }
+    currentSession.requestObserver.onNext(request);
+  }
+
+  public void onError() {
+    if (currentSession != null) {
+      currentSession.onError();
+    }
+  }
+
+  public interface CloseableStreamObserver
+      extends StreamObserver<RaftClientReplyProto>, Closeable {
+  }
+
+  class RpcSession implements Closeable {
+    private final StreamObserver<RaftClientRequestProto> requestObserver;
+    private final CloseableStreamObserver responseHandler;
+    private boolean hasError = false;
+
+    RpcSession(CloseableStreamObserver responseHandler) {
+      this.responseHandler = responseHandler;
+      this.requestObserver = proxy.append(responseHandler);
+    }
+
+    void onError() {
+      hasError = true;
+    }
+
+    @Override
+    public void close() {
+      if (!hasError) {
+        try {
+          requestObserver.onCompleted();
+        } catch (Exception ignored) {
+        }
+      }
+      try {
+        responseHandler.close();
+      } catch (IOException ignored) {
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
new file mode 100644
index 0000000..22f7f56
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.client;
+
+import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.grpc.GrpcUtil;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
+import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceImplBase;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.SlidingWindow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase {
+  public static final Logger LOG = LoggerFactory.getLogger(GrpcClientProtocolService.class);
+
+  private static class PendingAppend implements SlidingWindow.Request<RaftClientReply> {
+    private final RaftClientRequest request;
+    private volatile RaftClientReply reply;
+
+    PendingAppend(RaftClientRequest request) {
+      this.request = request;
+    }
+
+    @Override
+    public boolean hasReply() {
+      return reply != null || this == COMPLETED;
+    }
+
+    @Override
+    public void setReply(RaftClientReply reply) {
+      this.reply = reply;
+    }
+
+    RaftClientReply getReply() {
+      return reply;
+    }
+
+    RaftClientRequest getRequest() {
+      return request;
+    }
+
+    @Override
+    public long getSeqNum() {
+      return request != null? request.getSeqNum(): Long.MAX_VALUE;
+    }
+
+    @Override
+    public String toString() {
+      return request != null? getSeqNum() + ":" + reply: "COMPLETED";
+    }
+  }
+  private static final PendingAppend COMPLETED = new PendingAppend(null);
+
+  private final Supplier<RaftPeerId> idSupplier;
+  private final RaftClientAsynchronousProtocol protocol;
+
+  public GrpcClientProtocolService(Supplier<RaftPeerId> idSupplier, RaftClientAsynchronousProtocol protocol) {
+    this.idSupplier = idSupplier;
+    this.protocol = protocol;
+  }
+
+  RaftPeerId getId() {
+    return idSupplier.get();
+  }
+
+  @Override
+  public void setConfiguration(SetConfigurationRequestProto proto,
+      StreamObserver<RaftClientReplyProto> responseObserver) {
+    final SetConfigurationRequest request = ClientProtoUtils.toSetConfigurationRequest(proto);
+    GrpcUtil.asyncCall(responseObserver, () -> protocol.setConfigurationAsync(request),
+        ClientProtoUtils::toRaftClientReplyProto);
+  }
+
+  @Override
+  public StreamObserver<RaftClientRequestProto> append(
+      StreamObserver<RaftClientReplyProto> responseObserver) {
+    return new AppendRequestStreamObserver(responseObserver);
+  }
+
+  private final AtomicInteger streamCount = new AtomicInteger();
+
+  private class AppendRequestStreamObserver implements
+      StreamObserver<RaftClientRequestProto> {
+    private final String name = getId() + "-" +  streamCount.getAndIncrement();
+    private final StreamObserver<RaftClientReplyProto> responseObserver;
+    private final SlidingWindow.Server<PendingAppend, RaftClientReply> slidingWindow
+        = new SlidingWindow.Server<>(name, COMPLETED);
+    private final AtomicBoolean isClosed;
+
+    AppendRequestStreamObserver(StreamObserver<RaftClientReplyProto> ro) {
+      LOG.debug("new AppendRequestStreamObserver {}", name);
+      this.responseObserver = ro;
+      this.isClosed = new AtomicBoolean(false);
+    }
+
+    void processClientRequestAsync(PendingAppend pending) {
+      try {
+        protocol.submitClientRequestAsync(pending.getRequest()
+        ).thenAcceptAsync(reply -> slidingWindow.receiveReply(
+            pending.getSeqNum(), reply, this::sendReply, this::processClientRequestAsync)
+        ).exceptionally(exception -> {
+          // TODO: the exception may be from either raft or state machine.
+          // Currently we skip all the following responses when getting an
+          // exception from the state machine.
+          responseError(exception, () -> "processClientRequestAsync for " + pending.getRequest());
+          return null;
+        });
+      } catch (IOException e) {
+        throw new CompletionException("Failed processClientRequestAsync for " + pending.getRequest(), e);
+      }
+    }
+
+    @Override
+    public void onNext(RaftClientRequestProto request) {
+      try {
+        final RaftClientRequest r = ClientProtoUtils.toRaftClientRequest(request);
+        final PendingAppend p = new PendingAppend(r);
+        slidingWindow.receivedRequest(p, this::processClientRequestAsync);
+      } catch (Throwable e) {
+        responseError(e, () -> "onNext for " + ClientProtoUtils.toString(request));
+      }
+    }
+
+    private void sendReply(PendingAppend ready) {
+        Preconditions.assertTrue(ready.hasReply());
+        if (ready == COMPLETED) {
+          close();
+        } else {
+          LOG.debug("{}: sendReply seq={}, {}", name, ready.getSeqNum(), ready.getReply());
+          responseObserver.onNext(
+              ClientProtoUtils.toRaftClientReplyProto(ready.getReply()));
+        }
+    }
+
+    @Override
+    public void onError(Throwable t) {
+      // for now we just log a msg
+      GrpcUtil.warn(LOG, () -> name + ": onError", t);
+      slidingWindow.close();
+    }
+
+    @Override
+    public void onCompleted() {
+      if (slidingWindow.endOfRequests()) {
+        close();
+      }
+    }
+
+    private void close() {
+      if (isClosed.compareAndSet(false, true)) {
+        LOG.debug("{}: close", name);
+        responseObserver.onCompleted();
+        slidingWindow.close();
+      }
+    }
+
+    void responseError(Throwable t, Supplier<String> message) {
+      if (isClosed.compareAndSet(false, true)) {
+        t = JavaUtils.unwrapCompletionException(t);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(name + ": Failed " + message.get(), t);
+        }
+        responseObserver.onError(GrpcUtil.wrapException(t));
+        slidingWindow.close();
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index 48ab95d..47264e7 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -21,7 +21,7 @@ import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.client.impl.RaftClientRpcWithProxy;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.GrpcConfigKeys;
-import org.apache.ratis.grpc.RaftGrpcUtil;
+import org.apache.ratis.grpc.GrpcUtil;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.GroupManagementRequest;
 import org.apache.ratis.protocol.RaftClientReply;
@@ -46,14 +46,14 @@ import java.io.InterruptedIOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
-public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClient> {
+public class GrpcClientRpc extends RaftClientRpcWithProxy<GrpcClientProtocolClient> {
   public static final Logger LOG = LoggerFactory.getLogger(GrpcClientRpc.class);
 
   private final ClientId clientId;
   private final int maxMessageSize;
 
   public GrpcClientRpc(ClientId clientId, RaftProperties properties) {
-    super(new PeerProxyMap<>(clientId.toString(), p -> new RaftClientProtocolClient(clientId, p, properties)));
+    super(new PeerProxyMap<>(clientId.toString(), p -> new GrpcClientProtocolClient(clientId, p, properties)));
     this.clientId = clientId;
     this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug).getSizeInt();
   }
@@ -63,7 +63,7 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie
       RaftClientRequest request) {
     final RaftPeerId serverId = request.getServerId();
     try {
-      final RaftClientProtocolClient proxy = getProxies().getProxy(serverId);
+      final GrpcClientProtocolClient proxy = getProxies().getProxy(serverId);
       // Reuse the same grpc stream for all async calls.
       return proxy.getAppendStreamObservers().onNext(request);
     } catch (IOException e) {
@@ -75,7 +75,7 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie
   public RaftClientReply sendRequest(RaftClientRequest request)
       throws IOException {
     final RaftPeerId serverId = request.getServerId();
-    final RaftClientProtocolClient proxy = getProxies().getProxy(serverId);
+    final GrpcClientProtocolClient proxy = getProxies().getProxy(serverId);
     if (request instanceof GroupManagementRequest) {
       final GroupManagementRequestProto proto = ClientProtoUtils.toGroupManagementRequestProto((GroupManagementRequest)request);
       return ClientProtoUtils.toRaftClientReply(proxy.groupAdd(proto));
@@ -102,7 +102,7 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie
   }
 
   private CompletableFuture<RaftClientReply> sendRequest(
-      RaftClientRequest request, RaftClientProtocolClient proxy) throws IOException {
+      RaftClientRequest request, GrpcClientProtocolClient proxy) throws IOException {
     final RaftClientRequestProto requestProto =
         toRaftClientRequestProto(request);
     final CompletableFuture<RaftClientReplyProto> replyFuture =
@@ -117,7 +117,7 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie
 
           @Override
           public void onError(Throwable t) {
-            replyFuture.completeExceptionally(RaftGrpcUtil.unwrapIOException(t));
+            replyFuture.completeExceptionally(GrpcUtil.unwrapIOException(t));
           }
 
           @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java
new file mode 100644
index 0000000..5e5b941
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientStreamer.java
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.client;
+
+import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcUtil;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcRequestProto;
+import org.apache.ratis.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class GrpcClientStreamer implements Closeable {
+  public static final Logger LOG = LoggerFactory.getLogger(GrpcClientStreamer.class);
+
+  enum RunningState {RUNNING, LOOK_FOR_LEADER, CLOSED, ERROR}
+
+  private static class ExceptionAndRetry {
+    private final Map<RaftPeerId, IOException> exceptionMap = new HashMap<>();
+    private final AtomicInteger retryTimes = new AtomicInteger(0);
+    private final int maxRetryTimes;
+    private final TimeDuration retryInterval;
+
+    ExceptionAndRetry(RaftProperties prop) {
+      maxRetryTimes = GrpcConfigKeys.OutputStream.retryTimes(prop);
+      retryInterval = GrpcConfigKeys.OutputStream.retryInterval(prop);
+    }
+
+    void addException(RaftPeerId peer, IOException e) {
+      exceptionMap.put(peer, e);
+      retryTimes.incrementAndGet();
+    }
+
+    IOException getCombinedException() {
+      return new IOException("Exceptions: " + exceptionMap);
+    }
+
+    boolean shouldRetry() {
+      return retryTimes.get() <= maxRetryTimes;
+    }
+  }
+
+  private final Deque<RaftClientRequestProto> dataQueue;
+  private final Deque<RaftClientRequestProto> ackQueue;
+  private final int maxPendingNum;
+  private final SizeInBytes maxMessageSize;
+
+  private final PeerProxyMap<GrpcClientProtocolProxy> proxyMap;
+  private final Map<RaftPeerId, RaftPeer> peers;
+  private RaftPeerId leaderId;
+  private volatile GrpcClientProtocolProxy leaderProxy;
+  private final ClientId clientId;
+
+  private volatile RunningState running = RunningState.RUNNING;
+  private final ExceptionAndRetry exceptionAndRetry;
+  private final Sender senderThread;
+  private final RaftGroupId groupId;
+
+  GrpcClientStreamer(RaftProperties prop, RaftGroup group,
+      RaftPeerId leaderId, ClientId clientId) {
+    this.clientId = clientId;
+    maxPendingNum = GrpcConfigKeys.OutputStream.outstandingAppendsMax(prop);
+    maxMessageSize = GrpcConfigKeys.messageSizeMax(prop, LOG::debug);
+    dataQueue = new ConcurrentLinkedDeque<>();
+    ackQueue = new ConcurrentLinkedDeque<>();
+    exceptionAndRetry = new ExceptionAndRetry(prop);
+
+    this.groupId = group.getGroupId();
+    this.peers = group.getPeers().stream().collect(
+        Collectors.toMap(RaftPeer::getId, Function.identity()));
+    proxyMap = new PeerProxyMap<>(clientId.toString(),
+        raftPeer -> new GrpcClientProtocolProxy(clientId, raftPeer, ResponseHandler::new,
+            prop));
+    proxyMap.addPeers(group.getPeers());
+    refreshLeaderProxy(leaderId, null);
+
+    senderThread = new Sender();
+    senderThread.setName(this.toString() + "-sender");
+    senderThread.start();
+  }
+
+  private synchronized void refreshLeaderProxy(RaftPeerId suggested,
+      RaftPeerId oldLeader) {
+    if (suggested != null) {
+      leaderId = suggested;
+    } else {
+      if (oldLeader == null) {
+        leaderId = peers.keySet().iterator().next();
+      } else {
+        leaderId = CollectionUtils.random(oldLeader, peers.keySet());
+        if (leaderId == null) {
+          leaderId = oldLeader;
+        }
+      }
+    }
+    LOG.debug("{} switches leader from {} to {}. suggested leader: {}", this,
+          oldLeader, leaderId, suggested);
+    if (leaderProxy != null) {
+      leaderProxy.closeCurrentSession();
+    }
+    try {
+      leaderProxy = proxyMap.getProxy(leaderId);
+    } catch (IOException e) {
+      LOG.error("Should not hit IOException here", e);
+      refreshLeader(null, leaderId);
+    }
+  }
+
+  private boolean isRunning() {
+    return running == RunningState.RUNNING ||
+        running == RunningState.LOOK_FOR_LEADER;
+  }
+
+  private void checkState() throws IOException {
+    if (!isRunning()) {
+      throwException("The GrpcClientStreamer has been closed");
+    }
+  }
+
+  synchronized void write(ByteString content, long seqNum)
+      throws IOException {
+    checkState();
+    while (isRunning() && dataQueue.size() >= maxPendingNum) {
+      try {
+        wait();
+      } catch (InterruptedException ignored) {
+      }
+    }
+    if (isRunning()) {
+      // wrap the current buffer into a RaftClientRequestProto
+      final RaftClientRequestProto request = ClientProtoUtils.toRaftClientRequestProto(
+          clientId, leaderId, groupId, seqNum, seqNum, content);
+      if (request.getSerializedSize() > maxMessageSize.getSizeInt()) {
+        throw new IOException("msg size:" + request.getSerializedSize() +
+            " exceeds maximum:" + maxMessageSize.getSizeInt());
+      }
+      dataQueue.offer(request);
+      this.notifyAll();
+    } else {
+      throwException(this + " got closed.");
+    }
+  }
+
+  synchronized void flush() throws IOException {
+    checkState();
+    if (dataQueue.isEmpty() && ackQueue.isEmpty()) {
+      return;
+    }
+    // wait for the pending Q to become empty
+    while (isRunning() && (!dataQueue.isEmpty() || !ackQueue.isEmpty())) {
+      try {
+        wait();
+      } catch (InterruptedException ignored) {
+      }
+    }
+    if (!isRunning() && (!dataQueue.isEmpty() || !ackQueue.isEmpty())) {
+      throwException(this + " got closed before finishing flush");
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!isRunning()) {
+      return;
+    }
+    flush();
+
+    running = RunningState.CLOSED;
+    senderThread.interrupt();
+    try {
+      senderThread.join();
+    } catch (InterruptedException ignored) {
+    }
+    proxyMap.close();
+  }
+
+  @Override
+  public String toString() {
+    return this.getClass().getSimpleName() + "-" + clientId;
+  }
+
+  private class Sender extends Daemon {
+    @Override
+    public void run() {
+      while (isRunning()) {
+
+        synchronized (GrpcClientStreamer.this) {
+          while (isRunning() && shouldWait()) {
+            try {
+              GrpcClientStreamer.this.wait();
+            } catch (InterruptedException ignored) {
+            }
+          }
+          if (running == RunningState.RUNNING) {
+            Preconditions.assertTrue(!dataQueue.isEmpty(), "dataQueue is empty");
+            RaftClientRequestProto next = dataQueue.poll();
+            leaderProxy.onNext(next);
+            ackQueue.offer(next);
+          }
+        }
+      }
+    }
+
+    private boolean shouldWait() {
+      // the sender should wait if any of the following is true
+      // 1) there is no data to send
+      // 2) there are too many outstanding pending requests
+      // 3) Error/NotLeaderException just happened, we're still waiting for
+      //    the first response to confirm the new leader
+      return dataQueue.isEmpty() || ackQueue.size() >= maxPendingNum ||
+          running == RunningState.LOOK_FOR_LEADER;
+    }
+  }
+
+  /** the response handler for stream RPC */
+  private class ResponseHandler implements
+      GrpcClientProtocolProxy.CloseableStreamObserver {
+    private final RaftPeerId targetId;
+    // once handled the first NotLeaderException or Error, the handler should
+    // be inactive and should not make any further action.
+    private volatile boolean active = true;
+
+    ResponseHandler(RaftPeer target) {
+      targetId = target.getId();
+    }
+
+    @Override
+    public String toString() {
+      return GrpcClientStreamer.this + "-ResponseHandler-" + targetId;
+    }
+
+    @Override
+    public void onNext(RaftClientReplyProto reply) {
+      if (!active) {
+        return;
+      }
+      synchronized (GrpcClientStreamer.this) {
+        RaftClientRequestProto pending = Objects.requireNonNull(ackQueue.peek());
+        if (reply.getRpcReply().getSuccess()) {
+          Preconditions.assertTrue(pending.getRpcRequest().getCallId() == reply.getRpcReply().getCallId(),
+              () -> "pending=" + ClientProtoUtils.toString(pending) + " but reply=" + ClientProtoUtils.toString(reply));
+          ackQueue.poll();
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("{} received success ack for {}", this, ClientProtoUtils.toString(pending));
+          }
+          // we've identified the correct leader
+          if (running == RunningState.LOOK_FOR_LEADER) {
+            running = RunningState.RUNNING;
+          }
+        } else {
+          // this may be a NotLeaderException
+          RaftClientReply r = ClientProtoUtils.toRaftClientReply(reply);
+          final NotLeaderException nle = r.getNotLeaderException();
+          if (nle != null) {
+            LOG.debug("{} received a NotLeaderException from {}", this,
+                r.getServerId());
+            handleNotLeader(nle, targetId);
+          }
+        }
+        GrpcClientStreamer.this.notifyAll();
+      }
+    }
+
+    @Override
+    public void onError(Throwable t) {
+      LOG.warn(this + " onError", t);
+      if (active) {
+        synchronized (GrpcClientStreamer.this) {
+          handleError(t, this);
+          GrpcClientStreamer.this.notifyAll();
+        }
+      }
+    }
+
+    @Override
+    public void onCompleted() {
+      LOG.info("{} onCompleted, pending requests #: {}", this,
+          ackQueue.size());
+    }
+
+    @Override // called by handleError and handleNotLeader
+    public void close() throws IOException {
+      active = false;
+    }
+  }
+
+  private void throwException(String msg) throws IOException {
+    if (running == RunningState.ERROR) {
+      throw exceptionAndRetry.getCombinedException();
+    } else {
+      throw new IOException(msg);
+    }
+  }
+
+  private void handleNotLeader(NotLeaderException nle,
+      RaftPeerId oldLeader) {
+    Preconditions.assertTrue(Thread.holdsLock(GrpcClientStreamer.this));
+    // handle NotLeaderException: refresh leader and RaftConfiguration
+    refreshPeers(nle.getPeers());
+
+    refreshLeader(nle.getSuggestedLeader().getId(), oldLeader);
+  }
+
+  private void handleError(Throwable t, ResponseHandler handler) {
+    Preconditions.assertTrue(Thread.holdsLock(GrpcClientStreamer.this));
+    final IOException e = GrpcUtil.unwrapIOException(t);
+
+    exceptionAndRetry.addException(handler.targetId, e);
+    LOG.debug("{} got error: {}. Total retry times {}, max retry times {}.",
+        handler, e, exceptionAndRetry.retryTimes.get(),
+        exceptionAndRetry.maxRetryTimes);
+
+    leaderProxy.onError();
+    if (exceptionAndRetry.shouldRetry()) {
+      refreshLeader(null, leaderId);
+    } else {
+      running = RunningState.ERROR;
+    }
+  }
+
+  private void refreshLeader(RaftPeerId suggestedLeader, RaftPeerId oldLeader) {
+    running = RunningState.LOOK_FOR_LEADER;
+    refreshLeaderProxy(suggestedLeader, oldLeader);
+    reQueuePendingRequests(leaderId);
+
+    final RaftClientRequestProto request = Objects.requireNonNull(
+        dataQueue.poll());
+    ackQueue.offer(request);
+    try {
+      exceptionAndRetry.retryInterval.sleep();
+    } catch (InterruptedException ignored) {
+    }
+    leaderProxy.onNext(request);
+  }
+
+  private void reQueuePendingRequests(RaftPeerId newLeader) {
+    if (isRunning()) {
+      // resend all the pending requests
+      while (!ackQueue.isEmpty()) {
+        final RaftClientRequestProto oldRequest = ackQueue.pollLast();
+        final RaftRpcRequestProto.Builder newRpc = RaftRpcRequestProto.newBuilder(oldRequest.getRpcRequest())
+            .setReplyId(newLeader.toByteString());
+        final RaftClientRequestProto newRequest = RaftClientRequestProto.newBuilder(oldRequest)
+            .setRpcRequest(newRpc).build();
+        dataQueue.offerFirst(newRequest);
+      }
+    }
+  }
+
+  private void refreshPeers(RaftPeer[] newPeers) {
+    if (newPeers != null && newPeers.length > 0) {
+      // we only add new peers, we do not remove any peer even if it no longer
+      // belongs to the current raft conf
+      Arrays.stream(newPeers).forEach(peer -> {
+        peers.putIfAbsent(peer.getId(), peer);
+        proxyMap.computeIfAbsent(peer);
+      });
+
+      LOG.debug("refreshed peers: {}", peers);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcOutputStream.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcOutputStream.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcOutputStream.java
new file mode 100644
index 0000000..e857aaf
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcOutputStream.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.client;
+
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.util.ProtoUtils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class GrpcOutputStream extends OutputStream {
+  /** internal buffer */
+  private final byte buf[];
+  private int count;
+  private final AtomicLong seqNum = new AtomicLong();
+  private final ClientId clientId;
+  private final GrpcClientStreamer streamer;
+
+  private boolean closed = false;
+
+  public GrpcOutputStream(RaftProperties prop, ClientId clientId,
+      RaftGroup group, RaftPeerId leaderId) {
+    final int bufferSize = GrpcConfigKeys.OutputStream.bufferSize(prop).getSizeInt();
+    buf = new byte[bufferSize];
+    count = 0;
+    this.clientId = clientId;
+    streamer = new GrpcClientStreamer(prop, group, leaderId, clientId);
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    checkClosed();
+    buf[count++] = (byte)b;
+    flushIfNecessary();
+  }
+
+  private void flushIfNecessary() throws IOException {
+    if(count == buf.length) {
+      flushToStreamer();
+    }
+  }
+
+  @Override
+  public void write(byte b[], int off, int len) throws IOException {
+    checkClosed();
+    if (off < 0 || len < 0 || off > b.length - len) {
+      throw new ArrayIndexOutOfBoundsException();
+    }
+
+    int total = 0;
+    while (total < len) {
+      int toWrite = Math.min(len - total, buf.length - count);
+      System.arraycopy(b, off + total, buf, count, toWrite);
+      count += toWrite;
+      total += toWrite;
+      flushIfNecessary();
+    }
+  }
+
+  private void flushToStreamer() throws IOException {
+    if (count > 0) {
+      streamer.write(ProtoUtils.toByteString(buf, 0, count),
+          seqNum.getAndIncrement());
+      count = 0;
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    checkClosed();
+    flushToStreamer();
+    streamer.flush();
+  }
+
+  @Override
+  public void close() throws IOException {
+    flushToStreamer();
+    streamer.close(); // streamer will flush
+    this.closed = true;
+  }
+
+  @Override
+  public String toString() {
+    return "GrpcOutputStream-" + clientId;
+  }
+
+  private void checkClosed() throws IOException {
+    if (closed) {
+      throw new IOException(this.toString() + " was closed.");
+    }
+  }
+}


[2/3] incubator-ratis git commit: RATIS-324. Rename grpc classes. Contributed by Tsz Wo Nicholas Sze.

Posted by lj...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
deleted file mode 100644
index 9dd1a31..0000000
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc.client;
-
-import org.apache.ratis.client.RaftClientConfigKeys;
-import org.apache.ratis.client.impl.ClientProtoUtils;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.grpc.GrpcConfigKeys;
-import org.apache.ratis.grpc.RaftGrpcUtil;
-import org.apache.ratis.protocol.*;
-import org.apache.ratis.util.TimeoutScheduler;
-import org.apache.ratis.shaded.io.grpc.ManagedChannel;
-import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
-import org.apache.ratis.shaded.io.grpc.netty.NegotiationType;
-import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder;
-import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
-import org.apache.ratis.shaded.proto.RaftProtos.*;
-import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc;
-import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc.AdminProtocolServiceBlockingStub;
-import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc;
-import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceBlockingStub;
-import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceStub;
-import org.apache.ratis.util.CheckedSupplier;
-import org.apache.ratis.util.CollectionUtils;
-import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.SizeInBytes;
-import org.apache.ratis.util.TimeDuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-
-public class RaftClientProtocolClient implements Closeable {
-  public static final Logger LOG = LoggerFactory.getLogger(RaftClientProtocolClient.class);
-
-  private final Supplier<String> name;
-  private final RaftPeer target;
-  private final ManagedChannel channel;
-
-  private final TimeDuration requestTimeoutDuration;
-  private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(1);
-
-  private final RaftClientProtocolServiceBlockingStub blockingStub;
-  private final RaftClientProtocolServiceStub asyncStub;
-  private final AdminProtocolServiceBlockingStub adminBlockingStub;
-
-  private final AtomicReference<AsyncStreamObservers> appendStreamObservers = new AtomicReference<>();
-
-  public RaftClientProtocolClient(ClientId id, RaftPeer target, RaftProperties properties) {
-    this.name = JavaUtils.memoize(() -> id + "->" + target.getId());
-    this.target = target;
-
-    final SizeInBytes flowControlWindow = GrpcConfigKeys.flowControlWindow(properties, LOG::debug);
-    final SizeInBytes maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug);
-    channel = NettyChannelBuilder.forTarget(target.getAddress())
-        .negotiationType(NegotiationType.PLAINTEXT)
-        .flowControlWindow(flowControlWindow.getSizeInt())
-        .maxInboundMessageSize(maxMessageSize.getSizeInt())
-        .build();
-    blockingStub = RaftClientProtocolServiceGrpc.newBlockingStub(channel);
-    asyncStub = RaftClientProtocolServiceGrpc.newStub(channel);
-    adminBlockingStub = AdminProtocolServiceGrpc.newBlockingStub(channel);
-    this.requestTimeoutDuration = RaftClientConfigKeys.Rpc.requestTimeout(properties);
-  }
-
-  String getName() {
-    return name.get();
-  }
-
-  @Override
-  public void close() {
-    final AsyncStreamObservers observers = appendStreamObservers.get();
-    if (observers != null) {
-      observers.close();
-    }
-    channel.shutdownNow();
-  }
-
-  RaftClientReplyProto groupAdd(GroupManagementRequestProto request) throws IOException {
-    return blockingCall(() -> adminBlockingStub
-        .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
-        .groupManagement(request));
-  }
-
-  ServerInformationReplyProto serverInformation(ServerInformationRequestProto request) {
-    return adminBlockingStub
-        .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
-        .serverInformation(request);
-  }
-
-  RaftClientReplyProto setConfiguration(
-      SetConfigurationRequestProto request) throws IOException {
-    return blockingCall(() -> blockingStub
-        .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
-        .setConfiguration(request));
-  }
-
-  private static RaftClientReplyProto blockingCall(
-      CheckedSupplier<RaftClientReplyProto, StatusRuntimeException> supplier
-      ) throws IOException {
-    try {
-      return supplier.get();
-    } catch (StatusRuntimeException e) {
-      throw RaftGrpcUtil.unwrapException(e);
-    }
-  }
-
-  StreamObserver<RaftClientRequestProto> append(
-      StreamObserver<RaftClientReplyProto> responseHandler) {
-    return asyncStub.append(responseHandler);
-  }
-
-  StreamObserver<RaftClientRequestProto> appendWithTimeout(
-      StreamObserver<RaftClientReplyProto> responseHandler) {
-    return asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
-        .append(responseHandler);
-  }
-
-  AsyncStreamObservers getAppendStreamObservers() {
-    return appendStreamObservers.updateAndGet(a -> a != null? a : new AsyncStreamObservers());
-  }
-
-  public RaftPeer getTarget() {
-    return target;
-  }
-
-  class AsyncStreamObservers implements Closeable {
-    /** Request map: callId -> future */
-    private final AtomicReference<Map<Long, CompletableFuture<RaftClientReply>>> replies = new AtomicReference<>(new ConcurrentHashMap<>());
-    private final StreamObserver<RaftClientReplyProto> replyStreamObserver = new StreamObserver<RaftClientReplyProto>() {
-      @Override
-      public void onNext(RaftClientReplyProto proto) {
-        final long callId = proto.getRpcReply().getCallId();
-        try {
-          final RaftClientReply reply = ClientProtoUtils.toRaftClientReply(proto);
-          final NotLeaderException nle = reply.getNotLeaderException();
-          if (nle != null) {
-            completeReplyExceptionally(nle, NotLeaderException.class.getName());
-            return;
-          }
-          handleReplyFuture(callId, f -> f.complete(reply));
-        } catch (Throwable t) {
-          handleReplyFuture(callId, f -> f.completeExceptionally(t));
-        }
-      }
-
-      @Override
-      public void onError(Throwable t) {
-        final IOException ioe = RaftGrpcUtil.unwrapIOException(t);
-        completeReplyExceptionally(ioe, "onError");
-      }
-
-      @Override
-      public void onCompleted() {
-        completeReplyExceptionally(null, "completed");
-      }
-    };
-    private final StreamObserver<RaftClientRequestProto> requestStreamObserver = append(replyStreamObserver);
-
-    CompletableFuture<RaftClientReply> onNext(RaftClientRequest request) {
-      final Map<Long, CompletableFuture<RaftClientReply>> map = replies.get();
-      if (map == null) {
-        return JavaUtils.completeExceptionally(new IOException("Already closed."));
-      }
-      final CompletableFuture<RaftClientReply> f = new CompletableFuture<>();
-      CollectionUtils.putNew(request.getCallId(), f, map,
-          () -> getName() + ":" + getClass().getSimpleName());
-      try {
-        requestStreamObserver.onNext(ClientProtoUtils.toRaftClientRequestProto(request));
-        scheduler.onTimeout(requestTimeoutDuration, () -> timeoutCheck(request), LOG,
-            () -> "Timeout check failed for client request: " + request);
-      } catch(Throwable t) {
-        handleReplyFuture(request.getCallId(), future -> future.completeExceptionally(t));
-      }
-      return f;
-    }
-
-    private void timeoutCheck(RaftClientRequest request) {
-      handleReplyFuture(request.getCallId(), f -> f.completeExceptionally(
-          new IOException("Request timeout " + requestTimeoutDuration + ": " + request)));
-    }
-
-    private void handleReplyFuture(long callId, Consumer<CompletableFuture<RaftClientReply>> handler) {
-      Optional.ofNullable(replies.get())
-          .map(replyMap -> replyMap.remove(callId))
-          .ifPresent(handler);
-    }
-
-    @Override
-    public void close() {
-      requestStreamObserver.onCompleted();
-      completeReplyExceptionally(null, "close");
-    }
-
-    private void completeReplyExceptionally(Throwable t, String event) {
-      appendStreamObservers.compareAndSet(this, null);
-      final Map<Long, CompletableFuture<RaftClientReply>> map = replies.getAndSet(null);
-      if (map == null) {
-        return;
-      }
-      for (Map.Entry<Long, CompletableFuture<RaftClientReply>> entry : map.entrySet()) {
-        final CompletableFuture<RaftClientReply> f = entry.getValue();
-        if (!f.isDone()) {
-          f.completeExceptionally(t != null? t
-              : new IOException(getName() + ": Stream " + event
-                  + ": no reply for async request cid=" + entry.getKey()));
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java
deleted file mode 100644
index ee9ce4e..0000000
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolProxy.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc.client;
-
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
-import org.apache.ratis.protocol.RaftPeer;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.function.Function;
-
-public class RaftClientProtocolProxy implements Closeable {
-  private final RaftClientProtocolClient proxy;
-  private final Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation;
-  private RpcSession currentSession;
-
-  public RaftClientProtocolProxy(ClientId clientId, RaftPeer target,
-      Function<RaftPeer, CloseableStreamObserver> responseHandlerCreation,
-      RaftProperties properties) {
-    proxy = new RaftClientProtocolClient(clientId, target, properties);
-    this.responseHandlerCreation = responseHandlerCreation;
-  }
-
-  @Override
-  public void close() throws IOException {
-    closeCurrentSession();
-    proxy.close();
-  }
-
-  @Override
-  public String toString() {
-    return "ProxyTo:" + proxy.getTarget();
-  }
-
-  public void closeCurrentSession() {
-    if (currentSession != null) {
-      currentSession.close();
-      currentSession = null;
-    }
-  }
-
-  public void onNext(RaftClientRequestProto request) {
-    if (currentSession == null) {
-      currentSession = new RpcSession(
-          responseHandlerCreation.apply(proxy.getTarget()));
-    }
-    currentSession.requestObserver.onNext(request);
-  }
-
-  public void onError() {
-    if (currentSession != null) {
-      currentSession.onError();
-    }
-  }
-
-  public interface CloseableStreamObserver
-      extends StreamObserver<RaftClientReplyProto>, Closeable {
-  }
-
-  class RpcSession implements Closeable {
-    private final StreamObserver<RaftClientRequestProto> requestObserver;
-    private final CloseableStreamObserver responseHandler;
-    private boolean hasError = false;
-
-    RpcSession(CloseableStreamObserver responseHandler) {
-      this.responseHandler = responseHandler;
-      this.requestObserver = proxy.append(responseHandler);
-    }
-
-    void onError() {
-      hasError = true;
-    }
-
-    @Override
-    public void close() {
-      if (!hasError) {
-        try {
-          requestObserver.onCompleted();
-        } catch (Exception ignored) {
-        }
-      }
-      try {
-        responseHandler.close();
-      } catch (IOException ignored) {
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
deleted file mode 100644
index 4b92be5..0000000
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc.client;
-
-import org.apache.ratis.client.impl.ClientProtoUtils;
-import org.apache.ratis.grpc.RaftGrpcUtil;
-import org.apache.ratis.protocol.*;
-import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
-import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceImplBase;
-import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.Preconditions;
-import org.apache.ratis.util.SlidingWindow;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Supplier;
-
-public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase {
-  public static final Logger LOG = LoggerFactory.getLogger(RaftClientProtocolService.class);
-
-  private static class PendingAppend implements SlidingWindow.Request<RaftClientReply> {
-    private final RaftClientRequest request;
-    private volatile RaftClientReply reply;
-
-    PendingAppend(RaftClientRequest request) {
-      this.request = request;
-    }
-
-    @Override
-    public boolean hasReply() {
-      return reply != null || this == COMPLETED;
-    }
-
-    @Override
-    public void setReply(RaftClientReply reply) {
-      this.reply = reply;
-    }
-
-    RaftClientReply getReply() {
-      return reply;
-    }
-
-    RaftClientRequest getRequest() {
-      return request;
-    }
-
-    @Override
-    public long getSeqNum() {
-      return request != null? request.getSeqNum(): Long.MAX_VALUE;
-    }
-
-    @Override
-    public String toString() {
-      return request != null? getSeqNum() + ":" + reply: "COMPLETED";
-    }
-  }
-  private static final PendingAppend COMPLETED = new PendingAppend(null);
-
-  private final Supplier<RaftPeerId> idSupplier;
-  private final RaftClientAsynchronousProtocol protocol;
-
-  public RaftClientProtocolService(Supplier<RaftPeerId> idSupplier, RaftClientAsynchronousProtocol protocol) {
-    this.idSupplier = idSupplier;
-    this.protocol = protocol;
-  }
-
-  RaftPeerId getId() {
-    return idSupplier.get();
-  }
-
-  @Override
-  public void setConfiguration(SetConfigurationRequestProto proto,
-      StreamObserver<RaftClientReplyProto> responseObserver) {
-    final SetConfigurationRequest request = ClientProtoUtils.toSetConfigurationRequest(proto);
-    RaftGrpcUtil.asyncCall(responseObserver, () -> protocol.setConfigurationAsync(request),
-        ClientProtoUtils::toRaftClientReplyProto);
-  }
-
-  @Override
-  public StreamObserver<RaftClientRequestProto> append(
-      StreamObserver<RaftClientReplyProto> responseObserver) {
-    return new AppendRequestStreamObserver(responseObserver);
-  }
-
-  private final AtomicInteger streamCount = new AtomicInteger();
-
-  private class AppendRequestStreamObserver implements
-      StreamObserver<RaftClientRequestProto> {
-    private final String name = getId() + "-" +  streamCount.getAndIncrement();
-    private final StreamObserver<RaftClientReplyProto> responseObserver;
-    private final SlidingWindow.Server<PendingAppend, RaftClientReply> slidingWindow
-        = new SlidingWindow.Server<>(name, COMPLETED);
-    private final AtomicBoolean isClosed;
-
-    AppendRequestStreamObserver(StreamObserver<RaftClientReplyProto> ro) {
-      LOG.debug("new AppendRequestStreamObserver {}", name);
-      this.responseObserver = ro;
-      this.isClosed = new AtomicBoolean(false);
-    }
-
-    void processClientRequestAsync(PendingAppend pending) {
-      try {
-        protocol.submitClientRequestAsync(pending.getRequest()
-        ).thenAcceptAsync(reply -> slidingWindow.receiveReply(
-            pending.getSeqNum(), reply, this::sendReply, this::processClientRequestAsync)
-        ).exceptionally(exception -> {
-          // TODO: the exception may be from either raft or state machine.
-          // Currently we skip all the following responses when getting an
-          // exception from the state machine.
-          responseError(exception, () -> "processClientRequestAsync for " + pending.getRequest());
-          return null;
-        });
-      } catch (IOException e) {
-        throw new CompletionException("Failed processClientRequestAsync for " + pending.getRequest(), e);
-      }
-    }
-
-    @Override
-    public void onNext(RaftClientRequestProto request) {
-      try {
-        final RaftClientRequest r = ClientProtoUtils.toRaftClientRequest(request);
-        final PendingAppend p = new PendingAppend(r);
-        slidingWindow.receivedRequest(p, this::processClientRequestAsync);
-      } catch (Throwable e) {
-        responseError(e, () -> "onNext for " + ClientProtoUtils.toString(request));
-      }
-    }
-
-    private void sendReply(PendingAppend ready) {
-        Preconditions.assertTrue(ready.hasReply());
-        if (ready == COMPLETED) {
-          close();
-        } else {
-          LOG.debug("{}: sendReply seq={}, {}", name, ready.getSeqNum(), ready.getReply());
-          responseObserver.onNext(
-              ClientProtoUtils.toRaftClientReplyProto(ready.getReply()));
-        }
-    }
-
-    @Override
-    public void onError(Throwable t) {
-      // for now we just log a msg
-      RaftGrpcUtil.warn(LOG, () -> name + ": onError", t);
-      slidingWindow.close();
-    }
-
-    @Override
-    public void onCompleted() {
-      if (slidingWindow.endOfRequests()) {
-        close();
-      }
-    }
-
-    private void close() {
-      if (isClosed.compareAndSet(false, true)) {
-        LOG.debug("{}: close", name);
-        responseObserver.onCompleted();
-        slidingWindow.close();
-      }
-    }
-
-    void responseError(Throwable t, Supplier<String> message) {
-      if (isClosed.compareAndSet(false, true)) {
-        t = JavaUtils.unwrapCompletionException(t);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(name + ": Failed " + message.get(), t);
-        }
-        responseObserver.onError(RaftGrpcUtil.wrapException(t));
-        slidingWindow.close();
-      }
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java
deleted file mode 100644
index 09d57a0..0000000
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc.client;
-
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.grpc.GrpcConfigKeys;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.util.ProtoUtils;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class RaftOutputStream extends OutputStream {
-  /** internal buffer */
-  private final byte buf[];
-  private int count;
-  private final AtomicLong seqNum = new AtomicLong();
-  private final ClientId clientId;
-  private final AppendStreamer streamer;
-
-  private boolean closed = false;
-
-  public RaftOutputStream(RaftProperties prop, ClientId clientId,
-      RaftGroup group, RaftPeerId leaderId) {
-    final int bufferSize = GrpcConfigKeys.OutputStream.bufferSize(prop).getSizeInt();
-    buf = new byte[bufferSize];
-    count = 0;
-    this.clientId = clientId;
-    streamer = new AppendStreamer(prop, group, leaderId, clientId);
-  }
-
-  @Override
-  public void write(int b) throws IOException {
-    checkClosed();
-    buf[count++] = (byte)b;
-    flushIfNecessary();
-  }
-
-  private void flushIfNecessary() throws IOException {
-    if(count == buf.length) {
-      flushToStreamer();
-    }
-  }
-
-  @Override
-  public void write(byte b[], int off, int len) throws IOException {
-    checkClosed();
-    if (off < 0 || len < 0 || off > b.length - len) {
-      throw new ArrayIndexOutOfBoundsException();
-    }
-
-    int total = 0;
-    while (total < len) {
-      int toWrite = Math.min(len - total, buf.length - count);
-      System.arraycopy(b, off + total, buf, count, toWrite);
-      count += toWrite;
-      total += toWrite;
-      flushIfNecessary();
-    }
-  }
-
-  private void flushToStreamer() throws IOException {
-    if (count > 0) {
-      streamer.write(ProtoUtils.toByteString(buf, 0, count),
-          seqNum.getAndIncrement());
-      count = 0;
-    }
-  }
-
-  @Override
-  public void flush() throws IOException {
-    checkClosed();
-    flushToStreamer();
-    streamer.flush();
-  }
-
-  @Override
-  public void close() throws IOException {
-    flushToStreamer();
-    streamer.close(); // streamer will flush
-    this.closed = true;
-  }
-
-  @Override
-  public String toString() {
-    return "RaftOutputStream-" + clientId;
-  }
-
-  private void checkClosed() throws IOException {
-    if (closed) {
-      throw new IOException(this.toString() + " was closed.");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java
deleted file mode 100644
index d65abd0..0000000
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc.server;
-
-import org.apache.ratis.client.impl.ClientProtoUtils;
-import org.apache.ratis.grpc.RaftGrpcUtil;
-import org.apache.ratis.protocol.AdminAsynchronousProtocol;
-import org.apache.ratis.protocol.GroupManagementRequest;
-import org.apache.ratis.protocol.ServerInformationRequest;
-import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.GroupManagementRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationRequestProto;
-import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc.AdminProtocolServiceImplBase;
-
-public class AdminProtocolService extends AdminProtocolServiceImplBase {
-  private final AdminAsynchronousProtocol protocol;
-
-  public AdminProtocolService(AdminAsynchronousProtocol protocol) {
-    this.protocol = protocol;
-  }
-
-  @Override
-  public void groupManagement(GroupManagementRequestProto proto, StreamObserver<RaftClientReplyProto> responseObserver) {
-    final GroupManagementRequest request = ClientProtoUtils.toGroupManagementRequest(proto);
-    RaftGrpcUtil.asyncCall(responseObserver, () -> protocol.groupManagementAsync(request),
-        ClientProtoUtils::toRaftClientReplyProto);
-  }
-
-  @Override
-  public void serverInformation(ServerInformationRequestProto proto,
-      StreamObserver<ServerInformationReplyProto> responseObserver) {
-    final ServerInformationRequest request = ClientProtoUtils.toServerInformationRequest(proto);
-    RaftGrpcUtil.asyncCall(responseObserver, () -> protocol.getInfoAsync(request),
-        ClientProtoUtils::toServerInformationReplyProto);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
deleted file mode 100644
index 7dfe033..0000000
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
+++ /dev/null
@@ -1,438 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc.server;
-
-import org.apache.ratis.grpc.GrpcConfigKeys;
-import org.apache.ratis.grpc.RaftGRpcService;
-import org.apache.ratis.grpc.RaftGrpcUtil;
-import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.FollowerInfo;
-import org.apache.ratis.server.impl.LeaderState;
-import org.apache.ratis.server.impl.LogAppender;
-import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.ServerProtoUtils;
-import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
-import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
-import org.apache.ratis.statemachine.SnapshotInfo;
-import org.apache.ratis.util.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * A new log appender implementation using grpc bi-directional stream API.
- */
-public class GRpcLogAppender extends LogAppender {
-  public static final Logger LOG = LoggerFactory.getLogger(GRpcLogAppender.class);
-
-  private final RaftGRpcService rpcService;
-  private final Map<Long, AppendEntriesRequestProto> pendingRequests;
-  private final int maxPendingRequestsNum;
-  private long callId = 0;
-  private volatile boolean firstResponseReceived = false;
-
-  private final TimeDuration requestTimeoutDuration;
-  private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(1);
-
-  private volatile StreamObserver<AppendEntriesRequestProto> appendLogRequestObserver;
-
-  public GRpcLogAppender(RaftServerImpl server, LeaderState leaderState,
-                         FollowerInfo f) {
-    super(server, leaderState, f);
-
-    this.rpcService = (RaftGRpcService) server.getServerRpc();
-
-    maxPendingRequestsNum = GrpcConfigKeys.Server.leaderOutstandingAppendsMax(
-        server.getProxy().getProperties());
-    requestTimeoutDuration = RaftServerConfigKeys.Rpc.requestTimeout(server.getProxy().getProperties());
-    pendingRequests = new ConcurrentHashMap<>();
-  }
-
-  private RaftServerProtocolClient getClient() throws IOException {
-    return rpcService.getProxies().getProxy(follower.getPeer().getId());
-  }
-
-  private synchronized void resetClient(AppendEntriesRequestProto request) {
-    rpcService.getProxies().resetProxy(follower.getPeer().getId());
-    appendLogRequestObserver = null;
-    firstResponseReceived = false;
-
-    // clear the pending requests queue and reset the next index of follower
-    final long nextIndex = request != null && request.hasPreviousLog()?
-        request.getPreviousLog().getIndex() + 1: raftLog.getStartIndex();
-    clearPendingRequests(nextIndex);
-  }
-
-  @Override
-  protected void runAppenderImpl() throws IOException {
-    for(; isAppenderRunning(); mayWait()) {
-      if (shouldSendRequest()) {
-        SnapshotInfo snapshot = shouldInstallSnapshot();
-        if (snapshot != null) {
-          installSnapshot(snapshot);
-        } else if (!shouldWait()) {
-          // keep appending log entries or sending heartbeats
-          appendLog();
-        }
-      }
-      checkSlowness();
-    }
-
-    Optional.ofNullable(appendLogRequestObserver).ifPresent(StreamObserver::onCompleted);
-  }
-
-  private long getWaitTimeMs() {
-    if (!shouldSendRequest()) {
-      return getHeartbeatRemainingTime(); // No requests, wait until heartbeat
-    } else if (shouldWait()) {
-      return halfMinTimeoutMs; // Should wait for a short time
-    }
-    return 0L;
-  }
-
-  private void mayWait() {
-    // use lastSend time instead of lastResponse time
-    final long waitTimeMs = getWaitTimeMs();
-    if (waitTimeMs <= 0L) {
-      return;
-    }
-
-    synchronized(this) {
-      try {
-        LOG.trace("{}: wait {}ms", this, waitTimeMs);
-        wait(waitTimeMs);
-      } catch(InterruptedException ie) {
-        LOG.warn(this + ": Wait interrupted by " + ie);
-      }
-    }
-  }
-
-  @Override
-  protected boolean shouldSendRequest() {
-    return appendLogRequestObserver == null || super.shouldSendRequest();
-  }
-
-  /** @return true iff not received first response or queue is full. */
-  private boolean shouldWait() {
-    final int size = pendingRequests.size();
-    if (size == 0) {
-      return false;
-    }
-    return !firstResponseReceived || size >= maxPendingRequestsNum;
-  }
-
-  private void appendLog() throws IOException {
-    final AppendEntriesRequestProto pending;
-    final StreamObserver<AppendEntriesRequestProto> s;
-    synchronized (this) {
-      // prepare and enqueue the append request. note changes on follower's
-      // nextIndex and ops on pendingRequests should always be associated
-      // together and protected by the lock
-      pending = createRequest(callId++);
-      if (pending == null) {
-        return;
-      }
-      pendingRequests.put(pending.getServerRequest().getCallId(), pending);
-      updateNextIndex(pending);
-      if (appendLogRequestObserver == null) {
-        appendLogRequestObserver = getClient().appendEntries(new AppendLogResponseHandler());
-      }
-      s = appendLogRequestObserver;
-    }
-
-    if (isAppenderRunning()) {
-      sendRequest(pending, s);
-    }
-  }
-
-  private void sendRequest(AppendEntriesRequestProto request,
-      StreamObserver<AppendEntriesRequestProto> s) {
-    CodeInjectionForTesting.execute(RaftGRpcService.GRPC_SEND_SERVER_REQUEST,
-        server.getId(), null, request);
-
-    s.onNext(request);
-    scheduler.onTimeout(requestTimeoutDuration, () -> timeoutAppendRequest(request), LOG,
-        () -> "Timeout check failed for append entry request: " + request);
-    follower.updateLastRpcSendTime();
-  }
-
-  private void timeoutAppendRequest(AppendEntriesRequestProto request) {
-    AppendEntriesRequestProto pendingRequest = pendingRequests.remove(request.getServerRequest().getCallId());
-    if (pendingRequest != null) {
-      LOG.warn( "{}: appendEntries Timeout, request={}", this, ProtoUtils.toString(pendingRequest.getServerRequest()));
-    }
-  }
-
-  private void updateNextIndex(AppendEntriesRequestProto request) {
-    final int count = request.getEntriesCount();
-    if (count > 0) {
-      follower.updateNextIndex(request.getEntries(count - 1).getIndex() + 1);
-    }
-  }
-
-  /**
-   * StreamObserver for handling responses from the follower
-   */
-  private class AppendLogResponseHandler
-      implements StreamObserver<AppendEntriesReplyProto> {
-    /**
-     * After receiving a appendEntries reply, do the following:
-     * 1. If the reply is success, update the follower's match index and submit
-     *    an event to leaderState
-     * 2. If the reply is NOT_LEADER, step down
-     * 3. If the reply is INCONSISTENCY, decrease the follower's next index
-     *    based on the response
-     */
-    @Override
-    public void onNext(AppendEntriesReplyProto reply) {
-      LOG.debug("{} received {} response from {}", server.getId(),
-          (!firstResponseReceived ? "the first" : "a"),
-          follower.getPeer());
-
-      // update the last rpc time
-      follower.updateLastRpcResponseTime();
-
-      if (!firstResponseReceived) {
-        firstResponseReceived = true;
-      }
-      switch (reply.getResult()) {
-        case SUCCESS:
-          onSuccess(reply);
-          break;
-        case NOT_LEADER:
-          onNotLeader(reply);
-          break;
-        case INCONSISTENCY:
-          onInconsistency(reply);
-          break;
-        default:
-          break;
-      }
-      notifyAppend();
-    }
-
-    /**
-     * for now we simply retry the first pending request
-     */
-    @Override
-    public void onError(Throwable t) {
-      if (!isAppenderRunning()) {
-        LOG.info("{} is stopped", GRpcLogAppender.this);
-        return;
-      }
-      RaftGrpcUtil.warn(LOG, () -> server.getId() + ": Failed appendEntries to " + follower.getPeer(), t);
-
-      long callId = RaftGrpcUtil.getCallId(t);
-      resetClient(pendingRequests.get(callId));
-    }
-
-    @Override
-    public void onCompleted() {
-      LOG.info("{} stops appending log entries to follower {}", server.getId(),
-          follower);
-    }
-  }
-
-  private void clearPendingRequests(long newNextIndex) {
-    pendingRequests.clear();
-    follower.decreaseNextIndex(newNextIndex);
-  }
-
-  protected synchronized void onSuccess(AppendEntriesReplyProto reply) {
-    AppendEntriesRequestProto request = pendingRequests.remove(reply.getServerReply().getCallId());
-    if (request == null) {
-      // If reply comes after timeout, the reply is ignored.
-      LOG.warn("{}: Request not found, ignoring reply: {}", this, ServerProtoUtils.toString(reply));
-      return;
-    }
-    updateCommitIndex(request.getLeaderCommit());
-
-    final long replyNextIndex = reply.getNextIndex();
-    final long lastIndex = replyNextIndex - 1;
-    final boolean updateMatchIndex;
-
-    if (request.getEntriesCount() == 0) {
-      Preconditions.assertTrue(!request.hasPreviousLog() ||
-              lastIndex == request.getPreviousLog().getIndex(),
-          "reply's next index is %s, request's previous is %s",
-          replyNextIndex, request.getPreviousLog());
-      updateMatchIndex = request.hasPreviousLog() && follower.getMatchIndex() < lastIndex;
-    } else {
-      // check if the reply and the pending request is consistent
-      final long lastEntryIndex = request
-          .getEntries(request.getEntriesCount() - 1).getIndex();
-      Preconditions.assertTrue(lastIndex == lastEntryIndex,
-          "reply's next index is %s, request's last entry index is %s",
-          replyNextIndex, lastEntryIndex);
-      updateMatchIndex = true;
-    }
-    if (updateMatchIndex) {
-      follower.updateMatchIndex(lastIndex);
-      submitEventOnSuccessAppend();
-    }
-  }
-
-  private void onNotLeader(AppendEntriesReplyProto reply) {
-    checkResponseTerm(reply.getTerm());
-    // the running loop will end and the connection will onComplete
-  }
-
-  private synchronized void onInconsistency(AppendEntriesReplyProto reply) {
-    AppendEntriesRequestProto request = pendingRequests.remove(reply.getServerReply().getCallId());
-    if (request == null) {
-      // If reply comes after timeout, the reply is ignored.
-      LOG.warn("{}: Ignoring {}", server.getId(), reply);
-      return;
-    }
-    Preconditions.assertTrue(request.hasPreviousLog());
-    if (request.getPreviousLog().getIndex() >= reply.getNextIndex()) {
-      clearPendingRequests(reply.getNextIndex());
-    }
-  }
-
-  private class InstallSnapshotResponseHandler
-      implements StreamObserver<InstallSnapshotReplyProto> {
-    private final Queue<Integer> pending;
-    private final AtomicBoolean done = new AtomicBoolean(false);
-
-    InstallSnapshotResponseHandler() {
-      pending = new LinkedList<>();
-    }
-
-    synchronized void addPending(InstallSnapshotRequestProto request) {
-      pending.offer(request.getRequestIndex());
-    }
-
-    synchronized void removePending(InstallSnapshotReplyProto reply) {
-      int index = pending.poll();
-      Preconditions.assertTrue(index == reply.getRequestIndex());
-    }
-
-    boolean isDone() {
-      return done.get();
-    }
-
-    void close() {
-      done.set(true);
-      GRpcLogAppender.this.notifyAppend();
-    }
-
-    synchronized boolean hasAllResponse() {
-      return pending.isEmpty();
-    }
-
-    @Override
-    public void onNext(InstallSnapshotReplyProto reply) {
-      LOG.debug("{} received {} response from {}", server.getId(),
-          (!firstResponseReceived ? "the first" : "a"),
-          follower.getPeer());
-
-      // update the last rpc time
-      follower.updateLastRpcResponseTime();
-
-      if (!firstResponseReceived) {
-        firstResponseReceived = true;
-      }
-
-      switch (reply.getResult()) {
-        case SUCCESS:
-          removePending(reply);
-          break;
-        case NOT_LEADER:
-          checkResponseTerm(reply.getTerm());
-          break;
-        case UNRECOGNIZED:
-          break;
-      }
-    }
-
-    @Override
-    public void onError(Throwable t) {
-      if (!isAppenderRunning()) {
-        LOG.info("{} is stopped", GRpcLogAppender.this);
-        return;
-      }
-      LOG.info("{} got error when installing snapshot to {}, exception: {}",
-          server.getId(), follower.getPeer(), t);
-      resetClient(null);
-      close();
-    }
-
-    @Override
-    public void onCompleted() {
-      LOG.info("{} stops sending snapshots to follower {}", server.getId(),
-          follower);
-      close();
-    }
-  }
-
-  private void installSnapshot(SnapshotInfo snapshot) {
-    LOG.info("{}: follower {}'s next index is {}," +
-            " log's start index is {}, need to install snapshot",
-        server.getId(), follower.getPeer(), follower.getNextIndex(),
-        raftLog.getStartIndex());
-
-    final InstallSnapshotResponseHandler responseHandler = new InstallSnapshotResponseHandler();
-    StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver = null;
-    final String requestId = UUID.randomUUID().toString();
-    try {
-      snapshotRequestObserver = getClient().installSnapshot(responseHandler);
-      for (InstallSnapshotRequestProto request :
-          new SnapshotRequestIter(snapshot, requestId)) {
-        if (isAppenderRunning()) {
-          snapshotRequestObserver.onNext(request);
-          follower.updateLastRpcSendTime();
-          responseHandler.addPending(request);
-        } else {
-          break;
-        }
-      }
-      snapshotRequestObserver.onCompleted();
-    } catch (Exception e) {
-      LOG.warn("{} failed to install snapshot {}. Exception: {}", this,
-          snapshot.getFiles(), e);
-      if (snapshotRequestObserver != null) {
-        snapshotRequestObserver.onError(e);
-      }
-      return;
-    }
-
-    synchronized (this) {
-      while (isAppenderRunning() && !responseHandler.isDone()) {
-        try {
-          wait();
-        } catch (InterruptedException ignored) {
-        }
-      }
-    }
-
-    if (responseHandler.hasAllResponse()) {
-      follower.updateMatchIndex(snapshot.getTermIndex().getIndex());
-      follower.updateNextIndex(snapshot.getTermIndex().getIndex() + 1);
-      LOG.info("{}: install snapshot-{} successfully on follower {}",
-          server.getId(), snapshot.getTermIndex().getIndex(), follower.getPeer());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcAdminProtocolService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcAdminProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcAdminProtocolService.java
new file mode 100644
index 0000000..1201bf2
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcAdminProtocolService.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.server;
+
+import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.grpc.GrpcUtil;
+import org.apache.ratis.protocol.AdminAsynchronousProtocol;
+import org.apache.ratis.protocol.GroupManagementRequest;
+import org.apache.ratis.protocol.ServerInformationRequest;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.GroupManagementRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.ServerInformationRequestProto;
+import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc.AdminProtocolServiceImplBase;
+
+public class GrpcAdminProtocolService extends AdminProtocolServiceImplBase {
+  private final AdminAsynchronousProtocol protocol;
+
+  public GrpcAdminProtocolService(AdminAsynchronousProtocol protocol) {
+    this.protocol = protocol;
+  }
+
+  @Override
+  public void groupManagement(GroupManagementRequestProto proto, StreamObserver<RaftClientReplyProto> responseObserver) {
+    final GroupManagementRequest request = ClientProtoUtils.toGroupManagementRequest(proto);
+    GrpcUtil.asyncCall(responseObserver, () -> protocol.groupManagementAsync(request),
+        ClientProtoUtils::toRaftClientReplyProto);
+  }
+
+  @Override
+  public void serverInformation(ServerInformationRequestProto proto,
+      StreamObserver<ServerInformationReplyProto> responseObserver) {
+    final ServerInformationRequest request = ClientProtoUtils.toServerInformationRequest(proto);
+    GrpcUtil.asyncCall(responseObserver, () -> protocol.getInfoAsync(request),
+        ClientProtoUtils::toServerInformationReplyProto);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
new file mode 100644
index 0000000..3da58bf
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -0,0 +1,437 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.server;
+
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcUtil;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.FollowerInfo;
+import org.apache.ratis.server.impl.LeaderState;
+import org.apache.ratis.server.impl.LogAppender;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A new log appender implementation using grpc bi-directional stream API.
+ */
+public class GrpcLogAppender extends LogAppender {
+  public static final Logger LOG = LoggerFactory.getLogger(GrpcLogAppender.class);
+
+  private final GrpcService rpcService;
+  private final Map<Long, AppendEntriesRequestProto> pendingRequests;
+  private final int maxPendingRequestsNum;
+  private long callId = 0;
+  private volatile boolean firstResponseReceived = false;
+
+  private final TimeDuration requestTimeoutDuration;
+  private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(1);
+
+  private volatile StreamObserver<AppendEntriesRequestProto> appendLogRequestObserver;
+
+  public GrpcLogAppender(RaftServerImpl server, LeaderState leaderState,
+                         FollowerInfo f) {
+    super(server, leaderState, f);
+
+    this.rpcService = (GrpcService) server.getServerRpc();
+
+    maxPendingRequestsNum = GrpcConfigKeys.Server.leaderOutstandingAppendsMax(
+        server.getProxy().getProperties());
+    requestTimeoutDuration = RaftServerConfigKeys.Rpc.requestTimeout(server.getProxy().getProperties());
+    pendingRequests = new ConcurrentHashMap<>();
+  }
+
+  private GrpcServerProtocolClient getClient() throws IOException {
+    return rpcService.getProxies().getProxy(follower.getPeer().getId());
+  }
+
+  private synchronized void resetClient(AppendEntriesRequestProto request) {
+    rpcService.getProxies().resetProxy(follower.getPeer().getId());
+    appendLogRequestObserver = null;
+    firstResponseReceived = false;
+
+    // clear the pending requests queue and reset the next index of follower
+    final long nextIndex = request != null && request.hasPreviousLog()?
+        request.getPreviousLog().getIndex() + 1: raftLog.getStartIndex();
+    clearPendingRequests(nextIndex);
+  }
+
+  @Override
+  protected void runAppenderImpl() throws IOException {
+    for(; isAppenderRunning(); mayWait()) {
+      if (shouldSendRequest()) {
+        SnapshotInfo snapshot = shouldInstallSnapshot();
+        if (snapshot != null) {
+          installSnapshot(snapshot);
+        } else if (!shouldWait()) {
+          // keep appending log entries or sending heartbeats
+          appendLog();
+        }
+      }
+      checkSlowness();
+    }
+
+    Optional.ofNullable(appendLogRequestObserver).ifPresent(StreamObserver::onCompleted);
+  }
+
+  private long getWaitTimeMs() {
+    if (!shouldSendRequest()) {
+      return getHeartbeatRemainingTime(); // No requests, wait until heartbeat
+    } else if (shouldWait()) {
+      return halfMinTimeoutMs; // Should wait for a short time
+    }
+    return 0L;
+  }
+
+  private void mayWait() {
+    // use lastSend time instead of lastResponse time
+    final long waitTimeMs = getWaitTimeMs();
+    if (waitTimeMs <= 0L) {
+      return;
+    }
+
+    synchronized(this) {
+      try {
+        LOG.trace("{}: wait {}ms", this, waitTimeMs);
+        wait(waitTimeMs);
+      } catch(InterruptedException ie) {
+        LOG.warn(this + ": Wait interrupted by " + ie);
+      }
+    }
+  }
+
+  @Override
+  protected boolean shouldSendRequest() {
+    return appendLogRequestObserver == null || super.shouldSendRequest();
+  }
+
+  /** @return true iff not received first response or queue is full. */
+  private boolean shouldWait() {
+    final int size = pendingRequests.size();
+    if (size == 0) {
+      return false;
+    }
+    return !firstResponseReceived || size >= maxPendingRequestsNum;
+  }
+
+  private void appendLog() throws IOException {
+    final AppendEntriesRequestProto pending;
+    final StreamObserver<AppendEntriesRequestProto> s;
+    synchronized (this) {
+      // prepare and enqueue the append request. note changes on follower's
+      // nextIndex and ops on pendingRequests should always be associated
+      // together and protected by the lock
+      pending = createRequest(callId++);
+      if (pending == null) {
+        return;
+      }
+      pendingRequests.put(pending.getServerRequest().getCallId(), pending);
+      updateNextIndex(pending);
+      if (appendLogRequestObserver == null) {
+        appendLogRequestObserver = getClient().appendEntries(new AppendLogResponseHandler());
+      }
+      s = appendLogRequestObserver;
+    }
+
+    if (isAppenderRunning()) {
+      sendRequest(pending, s);
+    }
+  }
+
+  private void sendRequest(AppendEntriesRequestProto request,
+      StreamObserver<AppendEntriesRequestProto> s) {
+    CodeInjectionForTesting.execute(GrpcService.GRPC_SEND_SERVER_REQUEST,
+        server.getId(), null, request);
+
+    s.onNext(request);
+    scheduler.onTimeout(requestTimeoutDuration, () -> timeoutAppendRequest(request), LOG,
+        () -> "Timeout check failed for append entry request: " + request);
+    follower.updateLastRpcSendTime();
+  }
+
+  private void timeoutAppendRequest(AppendEntriesRequestProto request) {
+    AppendEntriesRequestProto pendingRequest = pendingRequests.remove(request.getServerRequest().getCallId());
+    if (pendingRequest != null) {
+      LOG.warn( "{}: appendEntries Timeout, request={}", this, ProtoUtils.toString(pendingRequest.getServerRequest()));
+    }
+  }
+
+  private void updateNextIndex(AppendEntriesRequestProto request) {
+    final int count = request.getEntriesCount();
+    if (count > 0) {
+      follower.updateNextIndex(request.getEntries(count - 1).getIndex() + 1);
+    }
+  }
+
+  /**
+   * StreamObserver for handling responses from the follower
+   */
+  private class AppendLogResponseHandler
+      implements StreamObserver<AppendEntriesReplyProto> {
+    /**
+     * After receiving a appendEntries reply, do the following:
+     * 1. If the reply is success, update the follower's match index and submit
+     *    an event to leaderState
+     * 2. If the reply is NOT_LEADER, step down
+     * 3. If the reply is INCONSISTENCY, decrease the follower's next index
+     *    based on the response
+     */
+    @Override
+    public void onNext(AppendEntriesReplyProto reply) {
+      LOG.debug("{} received {} response from {}", server.getId(),
+          (!firstResponseReceived ? "the first" : "a"),
+          follower.getPeer());
+
+      // update the last rpc time
+      follower.updateLastRpcResponseTime();
+
+      if (!firstResponseReceived) {
+        firstResponseReceived = true;
+      }
+      switch (reply.getResult()) {
+        case SUCCESS:
+          onSuccess(reply);
+          break;
+        case NOT_LEADER:
+          onNotLeader(reply);
+          break;
+        case INCONSISTENCY:
+          onInconsistency(reply);
+          break;
+        default:
+          break;
+      }
+      notifyAppend();
+    }
+
+    /**
+     * for now we simply retry the first pending request
+     */
+    @Override
+    public void onError(Throwable t) {
+      if (!isAppenderRunning()) {
+        LOG.info("{} is stopped", GrpcLogAppender.this);
+        return;
+      }
+      GrpcUtil.warn(LOG, () -> server.getId() + ": Failed appendEntries to " + follower.getPeer(), t);
+
+      long callId = GrpcUtil.getCallId(t);
+      resetClient(pendingRequests.get(callId));
+    }
+
+    @Override
+    public void onCompleted() {
+      LOG.info("{} stops appending log entries to follower {}", server.getId(),
+          follower);
+    }
+  }
+
+  private void clearPendingRequests(long newNextIndex) {
+    pendingRequests.clear();
+    follower.decreaseNextIndex(newNextIndex);
+  }
+
+  protected synchronized void onSuccess(AppendEntriesReplyProto reply) {
+    AppendEntriesRequestProto request = pendingRequests.remove(reply.getServerReply().getCallId());
+    if (request == null) {
+      // If reply comes after timeout, the reply is ignored.
+      LOG.warn("{}: Request not found, ignoring reply: {}", this, ServerProtoUtils.toString(reply));
+      return;
+    }
+    updateCommitIndex(request.getLeaderCommit());
+
+    final long replyNextIndex = reply.getNextIndex();
+    final long lastIndex = replyNextIndex - 1;
+    final boolean updateMatchIndex;
+
+    if (request.getEntriesCount() == 0) {
+      Preconditions.assertTrue(!request.hasPreviousLog() ||
+              lastIndex == request.getPreviousLog().getIndex(),
+          "reply's next index is %s, request's previous is %s",
+          replyNextIndex, request.getPreviousLog());
+      updateMatchIndex = request.hasPreviousLog() && follower.getMatchIndex() < lastIndex;
+    } else {
+      // check if the reply and the pending request is consistent
+      final long lastEntryIndex = request
+          .getEntries(request.getEntriesCount() - 1).getIndex();
+      Preconditions.assertTrue(lastIndex == lastEntryIndex,
+          "reply's next index is %s, request's last entry index is %s",
+          replyNextIndex, lastEntryIndex);
+      updateMatchIndex = true;
+    }
+    if (updateMatchIndex) {
+      follower.updateMatchIndex(lastIndex);
+      submitEventOnSuccessAppend();
+    }
+  }
+
+  private void onNotLeader(AppendEntriesReplyProto reply) {
+    checkResponseTerm(reply.getTerm());
+    // the running loop will end and the connection will onComplete
+  }
+
+  private synchronized void onInconsistency(AppendEntriesReplyProto reply) {
+    AppendEntriesRequestProto request = pendingRequests.remove(reply.getServerReply().getCallId());
+    if (request == null) {
+      // If reply comes after timeout, the reply is ignored.
+      LOG.warn("{}: Ignoring {}", server.getId(), reply);
+      return;
+    }
+    Preconditions.assertTrue(request.hasPreviousLog());
+    if (request.getPreviousLog().getIndex() >= reply.getNextIndex()) {
+      clearPendingRequests(reply.getNextIndex());
+    }
+  }
+
+  private class InstallSnapshotResponseHandler
+      implements StreamObserver<InstallSnapshotReplyProto> {
+    private final Queue<Integer> pending;
+    private final AtomicBoolean done = new AtomicBoolean(false);
+
+    InstallSnapshotResponseHandler() {
+      pending = new LinkedList<>();
+    }
+
+    synchronized void addPending(InstallSnapshotRequestProto request) {
+      pending.offer(request.getRequestIndex());
+    }
+
+    synchronized void removePending(InstallSnapshotReplyProto reply) {
+      int index = pending.poll();
+      Preconditions.assertTrue(index == reply.getRequestIndex());
+    }
+
+    boolean isDone() {
+      return done.get();
+    }
+
+    void close() {
+      done.set(true);
+      GrpcLogAppender.this.notifyAppend();
+    }
+
+    synchronized boolean hasAllResponse() {
+      return pending.isEmpty();
+    }
+
+    @Override
+    public void onNext(InstallSnapshotReplyProto reply) {
+      LOG.debug("{} received {} response from {}", server.getId(),
+          (!firstResponseReceived ? "the first" : "a"),
+          follower.getPeer());
+
+      // update the last rpc time
+      follower.updateLastRpcResponseTime();
+
+      if (!firstResponseReceived) {
+        firstResponseReceived = true;
+      }
+
+      switch (reply.getResult()) {
+        case SUCCESS:
+          removePending(reply);
+          break;
+        case NOT_LEADER:
+          checkResponseTerm(reply.getTerm());
+          break;
+        case UNRECOGNIZED:
+          break;
+      }
+    }
+
+    @Override
+    public void onError(Throwable t) {
+      if (!isAppenderRunning()) {
+        LOG.info("{} is stopped", GrpcLogAppender.this);
+        return;
+      }
+      LOG.info("{} got error when installing snapshot to {}, exception: {}",
+          server.getId(), follower.getPeer(), t);
+      resetClient(null);
+      close();
+    }
+
+    @Override
+    public void onCompleted() {
+      LOG.info("{} stops sending snapshots to follower {}", server.getId(),
+          follower);
+      close();
+    }
+  }
+
+  private void installSnapshot(SnapshotInfo snapshot) {
+    LOG.info("{}: follower {}'s next index is {}," +
+            " log's start index is {}, need to install snapshot",
+        server.getId(), follower.getPeer(), follower.getNextIndex(),
+        raftLog.getStartIndex());
+
+    final InstallSnapshotResponseHandler responseHandler = new InstallSnapshotResponseHandler();
+    StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver = null;
+    final String requestId = UUID.randomUUID().toString();
+    try {
+      snapshotRequestObserver = getClient().installSnapshot(responseHandler);
+      for (InstallSnapshotRequestProto request :
+          new SnapshotRequestIter(snapshot, requestId)) {
+        if (isAppenderRunning()) {
+          snapshotRequestObserver.onNext(request);
+          follower.updateLastRpcSendTime();
+          responseHandler.addPending(request);
+        } else {
+          break;
+        }
+      }
+      snapshotRequestObserver.onCompleted();
+    } catch (Exception e) {
+      LOG.warn("{} failed to install snapshot {}. Exception: {}", this,
+          snapshot.getFiles(), e);
+      if (snapshotRequestObserver != null) {
+        snapshotRequestObserver.onError(e);
+      }
+      return;
+    }
+
+    synchronized (this) {
+      while (isAppenderRunning() && !responseHandler.isDone()) {
+        try {
+          wait();
+        } catch (InterruptedException ignored) {
+        }
+      }
+    }
+
+    if (responseHandler.hasAllResponse()) {
+      follower.updateMatchIndex(snapshot.getTermIndex().getIndex());
+      follower.updateNextIndex(snapshot.getTermIndex().getIndex() + 1);
+      LOG.info("{}: install snapshot-{} successfully on follower {}",
+          server.getId(), snapshot.getTermIndex().getIndex(), follower.getPeer());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
new file mode 100644
index 0000000..3b2f8ba
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.server;
+
+import org.apache.ratis.shaded.io.grpc.ManagedChannel;
+import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.shaded.proto.RaftProtos.*;
+import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc;
+import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceBlockingStub;
+import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceStub;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.util.TimeDuration;
+
+import java.io.Closeable;
+
+/**
+ * This is a RaftClient implementation that supports streaming data to the raft
+ * ring. The stream implementation utilizes gRPC.
+ */
+public class GrpcServerProtocolClient implements Closeable {
+  private final ManagedChannel channel;
+  private final TimeDuration requestTimeoutDuration;
+  private final RaftServerProtocolServiceBlockingStub blockingStub;
+  private final RaftServerProtocolServiceStub asyncStub;
+
+  public GrpcServerProtocolClient(RaftPeer target, int flowControlWindow,
+      TimeDuration requestTimeoutDuration) {
+    channel = NettyChannelBuilder.forTarget(target.getAddress())
+        .usePlaintext(true).flowControlWindow(flowControlWindow)
+        .build();
+    blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel);
+    asyncStub = RaftServerProtocolServiceGrpc.newStub(channel);
+    this.requestTimeoutDuration = requestTimeoutDuration;
+  }
+
+  @Override
+  public void close() {
+    channel.shutdownNow();
+  }
+
+  public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) {
+    // the StatusRuntimeException will be handled by the caller
+    RequestVoteReplyProto r =
+        blockingStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
+            .requestVote(request);
+    return r;
+  }
+
+  StreamObserver<AppendEntriesRequestProto> appendEntries(
+      StreamObserver<AppendEntriesReplyProto> responseHandler) {
+    return asyncStub.appendEntries(responseHandler);
+  }
+
+  StreamObserver<InstallSnapshotRequestProto> installSnapshot(
+      StreamObserver<InstallSnapshotReplyProto> responseHandler) {
+    return asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
+        .installSnapshot(responseHandler);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
new file mode 100644
index 0000000..83335b8
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.server;
+
+import org.apache.ratis.grpc.GrpcUtil;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.shaded.proto.RaftProtos.*;
+import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase;
+import org.apache.ratis.util.ProtoUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+public class GrpcServerProtocolService extends RaftServerProtocolServiceImplBase {
+  public static final Logger LOG = LoggerFactory.getLogger(GrpcServerProtocolService.class);
+
+  private final Supplier<RaftPeerId> idSupplier;
+  private final RaftServer server;
+
+  public GrpcServerProtocolService(Supplier<RaftPeerId> idSupplier, RaftServer server) {
+    this.idSupplier = idSupplier;
+    this.server = server;
+  }
+
+  RaftPeerId getId() {
+    return idSupplier.get();
+  }
+
+  @Override
+  public void requestVote(RequestVoteRequestProto request,
+      StreamObserver<RequestVoteReplyProto> responseObserver) {
+    try {
+      final RequestVoteReplyProto reply = server.requestVote(request);
+      responseObserver.onNext(reply);
+      responseObserver.onCompleted();
+    } catch (Throwable e) {
+      GrpcUtil.warn(LOG, () -> getId() + ": Failed requestVote " + ProtoUtils.toString(request.getServerRequest()), e);
+      responseObserver.onError(GrpcUtil.wrapException(e));
+    }
+  }
+
+  @Override
+  public StreamObserver<AppendEntriesRequestProto> appendEntries(
+      StreamObserver<AppendEntriesReplyProto> responseObserver) {
+    return new StreamObserver<AppendEntriesRequestProto>() {
+      private final AtomicReference<CompletableFuture<Void>> previousOnNext =
+          new AtomicReference<>(CompletableFuture.completedFuture(null));
+      private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+      @Override
+      public void onNext(AppendEntriesRequestProto request) {
+        final CompletableFuture<Void> current = new CompletableFuture<>();
+        final CompletableFuture<Void> previous = previousOnNext.getAndSet(current);
+        try {
+          server.appendEntriesAsync(request).thenCombine(previous,
+              (reply, v) -> {
+            if (!isClosed.get()) {
+              responseObserver.onNext(reply);
+            }
+            current.complete(null);
+            return null;
+          });
+        } catch (Throwable e) {
+          GrpcUtil.warn(LOG, () -> getId() + ": Failed appendEntries " + ProtoUtils.toString(request.getServerRequest()), e);
+          responseObserver.onError(GrpcUtil.wrapException(e, request.getServerRequest().getCallId()));
+          current.completeExceptionally(e);
+        }
+      }
+
+      @Override
+      public void onError(Throwable t) {
+        // for now we just log a msg
+        GrpcUtil.warn(LOG, () -> getId() + ": appendEntries onError", t);
+      }
+
+      @Override
+      public void onCompleted() {
+        if (isClosed.compareAndSet(false, true)) {
+          LOG.info("{}: appendEntries completed", getId());
+          responseObserver.onCompleted();
+        }
+      }
+    };
+  }
+
+  @Override
+  public StreamObserver<InstallSnapshotRequestProto> installSnapshot(
+      StreamObserver<InstallSnapshotReplyProto> responseObserver) {
+    return new StreamObserver<InstallSnapshotRequestProto>() {
+      @Override
+      public void onNext(InstallSnapshotRequestProto request) {
+        try {
+          final InstallSnapshotReplyProto reply = server.installSnapshot(request);
+          responseObserver.onNext(reply);
+        } catch (Throwable e) {
+          GrpcUtil.warn(LOG, () -> getId() + ": Failed installSnapshot " + ProtoUtils.toString(request.getServerRequest()), e);
+          responseObserver.onError(GrpcUtil.wrapException(e));
+        }
+      }
+
+      @Override
+      public void onError(Throwable t) {
+        GrpcUtil.warn(LOG, () -> getId() + ": installSnapshot onError", t);
+      }
+
+      @Override
+      public void onCompleted() {
+        LOG.info("{}: installSnapshot completed", getId());
+        responseObserver.onCompleted();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
new file mode 100644
index 0000000..eb8310c
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.server;
+
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.client.GrpcClientProtocolService;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.impl.RaftServerRpcWithProxy;
+import org.apache.ratis.shaded.io.grpc.Server;
+import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder;
+import org.apache.ratis.shaded.proto.RaftProtos.*;
+import org.apache.ratis.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.function.Supplier;
+
+/** A grpc implementation of {@link RaftServerRpc}. */
+public class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocolClient, PeerProxyMap<GrpcServerProtocolClient>> {
+  static final Logger LOG = LoggerFactory.getLogger(GrpcService.class);
+  public static final String GRPC_SEND_SERVER_REQUEST =
+      GrpcService.class.getSimpleName() + ".sendRequest";
+
+  public static class Builder extends RaftServerRpc.Builder<Builder, GrpcService> {
+    private Builder() {}
+
+    @Override
+    public Builder getThis() {
+      return this;
+    }
+
+    @Override
+    public GrpcService build() {
+      return new GrpcService(getServer());
+    }
+  }
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  private final Server server;
+  private final Supplier<InetSocketAddress> addressSupplier;
+
+  private GrpcService(RaftServer server) {
+    this(server, server::getId,
+        GrpcConfigKeys.Server.port(server.getProperties()),
+        GrpcConfigKeys.messageSizeMax(server.getProperties(), LOG::info),
+        RaftServerConfigKeys.Log.Appender.bufferCapacity(server.getProperties()),
+        GrpcConfigKeys.flowControlWindow(server.getProperties(), LOG::info),
+        RaftServerConfigKeys.Rpc.requestTimeout(server.getProperties()));
+  }
+  private GrpcService(RaftServer raftServer, Supplier<RaftPeerId> idSupplier, int port,
+      SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize,
+      SizeInBytes flowControlWindow, TimeDuration requestTimeoutDuration) {
+    super(idSupplier, id -> new PeerProxyMap<>(id.toString(),
+        p -> new GrpcServerProtocolClient(p, flowControlWindow.getSizeInt(), requestTimeoutDuration)));
+    if (appenderBufferSize.getSize() > grpcMessageSizeMax.getSize()) {
+      throw new IllegalArgumentException("Illegal configuration: "
+          + RaftServerConfigKeys.Log.Appender.BUFFER_CAPACITY_KEY + " = " + appenderBufferSize
+          + " > " + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + " = " + grpcMessageSizeMax);
+    }
+
+    server = NettyServerBuilder.forPort(port)
+        .maxInboundMessageSize(grpcMessageSizeMax.getSizeInt())
+        .flowControlWindow(flowControlWindow.getSizeInt())
+        .addService(new GrpcServerProtocolService(idSupplier, raftServer))
+        .addService(new GrpcClientProtocolService(idSupplier, raftServer))
+        .addService(new GrpcAdminProtocolService(raftServer))
+        .build();
+    addressSupplier = JavaUtils.memoize(() -> new InetSocketAddress(port != 0? port: server.getPort()));
+  }
+
+  @Override
+  public SupportedRpcType getRpcType() {
+    return SupportedRpcType.GRPC;
+  }
+
+  @Override
+  public void startImpl() {
+    try {
+      server.start();
+    } catch (IOException e) {
+      ExitUtils.terminate(1, "Failed to start Grpc server", e, LOG);
+    }
+    LOG.info("{}: {} started, listening on {}", getId(), getClass().getSimpleName(), getInetSocketAddress());
+  }
+
+  @Override
+  public void closeImpl() throws IOException {
+    final String name = getId() + ": shutdown server with port " + server.getPort();
+    LOG.info("{} now", name);
+    final Server s = server.shutdownNow();
+    super.closeImpl();
+    try {
+      s.awaitTermination();
+    } catch(InterruptedException e) {
+      throw IOUtils.toInterruptedIOException(name + " failed", e);
+    }
+    LOG.info("{} successfully", name);
+  }
+
+  @Override
+  public InetSocketAddress getInetSocketAddress() {
+    return addressSupplier.get();
+  }
+
+  @Override
+  public AppendEntriesReplyProto appendEntries(
+      AppendEntriesRequestProto request) throws IOException {
+    throw new UnsupportedOperationException(
+        "Blocking AppendEntries call is not supported");
+  }
+
+  @Override
+  public InstallSnapshotReplyProto installSnapshot(
+      InstallSnapshotRequestProto request) throws IOException {
+    throw new UnsupportedOperationException(
+        "Blocking InstallSnapshot call is not supported");
+  }
+
+  @Override
+  public RequestVoteReplyProto requestVote(RequestVoteRequestProto request)
+      throws IOException {
+    CodeInjectionForTesting.execute(GRPC_SEND_SERVER_REQUEST, getId(),
+        null, request);
+
+    final RaftPeerId target = RaftPeerId.valueOf(request.getServerRequest().getReplyId());
+    return getProxies().getProxy(target).requestVote(request);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ed8e60da/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java
deleted file mode 100644
index b801c2a..0000000
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/RaftServerProtocolClient.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc.server;
-
-import org.apache.ratis.shaded.io.grpc.ManagedChannel;
-import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder;
-import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
-import org.apache.ratis.shaded.proto.RaftProtos.*;
-import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc;
-import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceBlockingStub;
-import org.apache.ratis.shaded.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceStub;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.util.TimeDuration;
-
-import java.io.Closeable;
-
-/**
- * This is a RaftClient implementation that supports streaming data to the raft
- * ring. The stream implementation utilizes gRPC.
- */
-public class RaftServerProtocolClient implements Closeable {
-  private final ManagedChannel channel;
-  private final TimeDuration requestTimeoutDuration;
-  private final RaftServerProtocolServiceBlockingStub blockingStub;
-  private final RaftServerProtocolServiceStub asyncStub;
-
-  public RaftServerProtocolClient(RaftPeer target, int flowControlWindow,
-      TimeDuration requestTimeoutDuration) {
-    channel = NettyChannelBuilder.forTarget(target.getAddress())
-        .usePlaintext(true).flowControlWindow(flowControlWindow)
-        .build();
-    blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel);
-    asyncStub = RaftServerProtocolServiceGrpc.newStub(channel);
-    this.requestTimeoutDuration = requestTimeoutDuration;
-  }
-
-  @Override
-  public void close() {
-    channel.shutdownNow();
-  }
-
-  public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) {
-    // the StatusRuntimeException will be handled by the caller
-    RequestVoteReplyProto r =
-        blockingStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
-            .requestVote(request);
-    return r;
-  }
-
-  StreamObserver<AppendEntriesRequestProto> appendEntries(
-      StreamObserver<AppendEntriesReplyProto> responseHandler) {
-    return asyncStub.appendEntries(responseHandler);
-  }
-
-  StreamObserver<InstallSnapshotRequestProto> installSnapshot(
-      StreamObserver<InstallSnapshotReplyProto> responseHandler) {
-    return asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
-        .installSnapshot(responseHandler);
-  }
-}