You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/05/19 18:23:42 UTC

[2/2] incubator-ratis git commit: RATIS-86. Support raft server re-initialization. Contributed by Tsz Wo Nicholas Sze.

RATIS-86. Support raft server re-initialization. Contributed by Tsz Wo Nicholas Sze.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/06002e67
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/06002e67
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/06002e67

Branch: refs/heads/master
Commit: 06002e67a3476a491e4fc0f1638123f5957c452e
Parents: 291f51b
Author: Jing Zhao <ji...@apache.org>
Authored: Fri May 19 11:23:33 2017 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Fri May 19 11:23:33 2017 -0700

----------------------------------------------------------------------
 .../org/apache/ratis/client/RaftClient.java     |  3 +
 .../ratis/client/impl/ClientProtoUtils.java     | 22 +++++
 .../ratis/client/impl/RaftClientImpl.java       | 22 ++++-
 .../protocol/AdminAsynchronousProtocol.java     | 27 +++++++
 .../apache/ratis/protocol/AdminProtocol.java    | 25 ++++++
 .../ratis/protocol/ReinitializeRequest.java     | 39 +++++++++
 .../org/apache/ratis/util/CheckedSupplier.java  | 30 +++++++
 .../java/org/apache/ratis/util/IOUtils.java     |  4 +-
 .../java/org/apache/ratis/util/JavaUtils.java   | 63 +++++++++++++++
 .../org/apache/ratis/grpc/RaftGRpcService.java  |  2 +
 .../org/apache/ratis/grpc/RaftGrpcUtil.java     | 32 ++++++--
 .../apache/ratis/grpc/client/GrpcClientRpc.java |  7 +-
 .../grpc/client/RaftClientProtocolClient.java   | 26 ++++--
 .../grpc/client/RaftClientProtocolService.java  | 27 ++-----
 .../ratis/grpc/server/AdminProtocolService.java | 45 +++++++++++
 .../grpc/TestReinitializationWithGrpc.java      | 28 +++++++
 .../apache/ratis/hadooprpc/HadoopConstants.java |  4 +-
 .../client/CombinedClientProtocol.java          | 25 ++++++
 ...nedClientProtocolClientSideTranslatorPB.java | 83 +++++++++++++++++++
 .../client/CombinedClientProtocolPB.java        | 37 +++++++++
 ...nedClientProtocolServerSideTranslatorPB.java | 82 +++++++++++++++++++
 .../ratis/hadooprpc/client/HadoopClientRpc.java | 10 ++-
 ...aftClientProtocolClientSideTranslatorPB.java | 70 ----------------
 .../hadooprpc/client/RaftClientProtocolPB.java  | 37 ---------
 ...aftClientProtocolServerSideTranslatorPB.java | 69 ----------------
 .../hadooprpc/server/HadoopRpcService.java      | 18 ++---
 .../TestReinitializationWithHadoopRpc.java      | 28 +++++++
 .../ratis/netty/client/NettyClientRpc.java      |  8 +-
 .../ratis/netty/server/NettyRpcService.java     |  9 +++
 .../netty/TestReinitializationWithNetty.java    | 28 +++++++
 ratis-proto-shaded/src/main/proto/GRpc.proto    |  6 ++
 ratis-proto-shaded/src/main/proto/Hadoop.proto  |  6 +-
 ratis-proto-shaded/src/main/proto/Netty.proto   |  1 +
 ratis-proto-shaded/src/main/proto/Raft.proto    |  6 ++
 .../org/apache/ratis/server/RaftServer.java     |  8 +-
 .../apache/ratis/server/impl/LeaderState.java   |  3 +-
 .../ratis/server/impl/PeerConfiguration.java    |  1 -
 .../ratis/server/impl/RaftServerImpl.java       |  2 +-
 .../ratis/server/impl/RaftServerProxy.java      | 85 +++++++++++++++++---
 .../java/org/apache/ratis/MiniRaftCluster.java  | 24 +++---
 .../java/org/apache/ratis/RaftBasicTests.java   |  2 +-
 .../java/org/apache/ratis/RaftTestUtil.java     | 23 +++++-
 .../impl/RaftReconfigurationBaseTest.java       |  2 +-
 .../server/impl/ReinitializationBaseTest.java   | 84 +++++++++++++++++++
 .../ratis/server/simulation/SimulatedRpc.java   |  1 -
 .../server/simulation/SimulatedServerRpc.java   | 27 +++----
 .../TestReinitializationWithSimulatedRpc.java   | 28 +++++++
 47 files changed, 938 insertions(+), 281 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index 8f3c465..956a6de 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -52,6 +52,9 @@ public interface RaftClient extends Closeable {
   /** Send set configuration request to the raft service. */
   RaftClientReply setConfiguration(RaftPeer[] serversInNewConf) throws IOException;
 
+  /** Send reinitialize request to the service. */
+  RaftClientReply reinitialize(RaftPeer[] serversInNewConf, RaftPeerId server) throws IOException;
+
   /** @return a {@link Builder}. */
   static Builder newBuilder() {
     return new Builder();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 62a9ee4..ff49109 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -185,4 +185,26 @@ public class ClientProtoUtils {
             Arrays.asList(request.getPeersInNewConf())))
         .build();
   }
+
+  public static ReinitializeRequest toReinitializeRequest(
+      ReinitializeRequestProto p) {
+    final RaftRpcRequestProto m = p.getRpcRequest();
+    final RaftPeer[] peers = ProtoUtils.toRaftPeerArray(p.getPeersList());
+    return new ReinitializeRequest(
+        new ClientId(m.getRequestorId().toByteArray()),
+        RaftPeerId.valueOf(m.getReplyId()),
+        p.getRpcRequest().getCallId(), peers);
+  }
+
+  public static ReinitializeRequestProto toReinitializeRequestProto(
+      ReinitializeRequest request) {
+    return ReinitializeRequestProto.newBuilder()
+        .setRpcRequest(toRaftRpcRequestProtoBuilder(
+            request.getClientId().toBytes(),
+            request.getServerId().toBytes(),
+            request.getCallId()))
+        .addAllPeers(ProtoUtils.toRaftPeerProtos(
+            Arrays.asList(request.getPeersInNewConf())))
+        .build();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 2125ce0..40e670d 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -31,6 +31,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
+import java.util.stream.Collector;
 import java.util.stream.Collectors;
 
 /** A client who sends requests to a raft service. */
@@ -86,12 +87,25 @@ final class RaftClientImpl implements RaftClient {
       throws IOException {
     final long callId = nextCallId();
     // also refresh the rpc proxies for these peers
-    clientRpc.addServers(Arrays.stream(peersInNewConf).filter(peers::contains)
-        .collect(Collectors.toCollection(ArrayList::new)));
+    addServers(peersInNewConf);
     return sendRequestWithRetry(() -> new SetConfigurationRequest(
         clientId, leaderId, callId, peersInNewConf));
   }
 
+  @Override
+  public RaftClientReply reinitialize(RaftPeer[] peersInNewConf, RaftPeerId server)
+      throws IOException {
+    final long callId = nextCallId();
+    addServers(peersInNewConf);
+    return sendRequest(new ReinitializeRequest(
+        clientId, server, callId, peersInNewConf));
+  }
+
+  private void addServers(RaftPeer[] peersInNewConf) {
+    clientRpc.addServers(Arrays.stream(peersInNewConf).filter(peers::contains)
+        .collect(Collectors.toList()));
+  }
+
   private RaftClientReply sendRequestWithRetry(
       Supplier<RaftClientRequest> supplier)
       throws InterruptedIOException, StateMachineException {
@@ -157,6 +171,10 @@ final class RaftClientImpl implements RaftClient {
       RaftPeerId newLeader) {
     LOG.debug("{}: suggested new leader: {}. Failed with {}", clientId,
         newLeader, ioe);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Stack trace", new Throwable("TRACE"));
+    }
+
     final RaftPeerId oldLeader = request.getServerId();
     if (newLeader == null && oldLeader.equals(leaderId)) {
       newLeader = CollectionUtils.next(oldLeader, CollectionUtils.as(peers, RaftPeer::getId));

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java
new file mode 100644
index 0000000..663751b
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminAsynchronousProtocol.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+/** Asynchronous version of {@link AdminProtocol}. */
+public interface AdminAsynchronousProtocol {
+  CompletableFuture<RaftClientReply> reinitializeAsync(
+      ReinitializeRequest request) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java
new file mode 100644
index 0000000..0e7d6b6
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/AdminProtocol.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import java.io.IOException;
+
+/** For server administration. */
+public interface AdminProtocol {
+  RaftClientReply reinitialize(ReinitializeRequest request) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java
new file mode 100644
index 0000000..0a89340
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import java.util.Arrays;
+
+public class ReinitializeRequest extends RaftClientRequest {
+  private final RaftPeer[] peers;
+
+  public ReinitializeRequest(ClientId clientId, RaftPeerId serverId,
+                             long callId, RaftPeer[] peers) {
+    super(clientId, serverId, callId, null);
+    this.peers = peers;
+  }
+
+  public RaftPeer[] getPeersInNewConf() {
+    return peers;
+  }
+
+  @Override
+  public String toString() {
+    return super.toString() + ", peers:" + Arrays.asList(getPeersInNewConf());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java b/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java
new file mode 100644
index 0000000..0c9de31
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CheckedSupplier.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import java.util.function.Supplier;
+
+/** Function with a throws-clause. */
+@FunctionalInterface
+public interface CheckedSupplier<OUTPUT, THROWABLE extends Throwable> {
+  /**
+   * The same as {@link Supplier#get()}
+   * except that this method is declared with a throws-clause.
+   */
+  OUTPUT get() throws THROWABLE;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
index ba5e78e..4976be8 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
@@ -43,7 +43,9 @@ public interface IOUtils {
   }
 
   static IOException asIOException(Throwable t) {
-    return t instanceof IOException? (IOException)t : new IOException(t);
+    return t == null? null
+        : t instanceof IOException? (IOException)t
+        : new IOException(t);
   }
 
   static IOException toIOException(ExecutionException e) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
new file mode 100644
index 0000000..5da2012
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -0,0 +1,63 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.ratis.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * General Java utility methods.
+ */
+public interface JavaUtils {
+  Logger LOG = LoggerFactory.getLogger(JavaUtils.class);
+
+  /**
+   * Invoke {@link Callable#call()} and, if there any,
+   * wrap the checked exception by {@link RuntimeException}.
+   */
+  static <T> T callAsUnchecked(Callable<T> callable) {
+    try {
+      return callable.call();
+    } catch (RuntimeException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Get the value from the future and then consume it.
+   */
+  static <T> void getAndConsume(CompletableFuture<T> future, Consumer<T> consumer) {
+    final T t;
+    try {
+      t = future.get();
+    } catch (Exception ignored) {
+      LOG.warn("Failed to get()", ignored);
+      return;
+    }
+    consumer.accept(t);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
index 0deb3f4..96a7a45 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.grpc;
 
 import org.apache.ratis.grpc.client.RaftClientProtocolService;
+import org.apache.ratis.grpc.server.AdminProtocolService;
 import org.apache.ratis.grpc.server.RaftServerProtocolClient;
 import org.apache.ratis.grpc.server.RaftServerProtocolService;
 import org.apache.ratis.protocol.RaftPeer;
@@ -82,6 +83,7 @@ public class RaftGRpcService implements RaftServerRpc {
     server = ((NettyServerBuilder) serverBuilder).maxMessageSize(maxMessageSize)
         .addService(new RaftServerProtocolService(selfId, raftServer))
         .addService(new RaftClientProtocolService(selfId, raftServer))
+        .addService(new AdminProtocolService(selfId, raftServer))
         .build();
 
     // start service to determine the port (in case port is configured as 0)

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
index b89c297..fdc9ce8 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcUtil.java
@@ -17,20 +17,26 @@
  */
 package org.apache.ratis.grpc;
 
+import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.shaded.io.grpc.Metadata;
 import org.apache.ratis.shaded.io.grpc.Status;
 import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
+import org.apache.ratis.util.CheckedSupplier;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.ReflectionUtils;
 import org.apache.ratis.util.StringUtils;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
 
-public class RaftGrpcUtil {
-  public static final Metadata.Key<String> EXCEPTION_TYPE_KEY =
+public interface RaftGrpcUtil {
+  Metadata.Key<String> EXCEPTION_TYPE_KEY =
       Metadata.Key.of("exception-type", Metadata.ASCII_STRING_MARSHALLER);
 
-  public static StatusRuntimeException wrapException(Throwable t) {
+  static StatusRuntimeException wrapException(Throwable t) {
     Metadata trailers = new Metadata();
     trailers.put(EXCEPTION_TYPE_KEY, t.getClass().getCanonicalName());
     return new StatusRuntimeException(
@@ -38,7 +44,7 @@ public class RaftGrpcUtil {
         trailers);
   }
 
-  public static IOException unwrapException(StatusRuntimeException se) {
+  static IOException unwrapException(StatusRuntimeException se) {
     final Metadata trailers = se.getTrailers();
     final Status status = se.getStatus();
     if (trailers != null && status != null) {
@@ -57,7 +63,7 @@ public class RaftGrpcUtil {
     return new IOException(se);
   }
 
-  public static IOException unwrapIOException(Throwable t) {
+  static IOException unwrapIOException(Throwable t) {
     final IOException e;
     if (t instanceof StatusRuntimeException) {
       e = RaftGrpcUtil.unwrapException((StatusRuntimeException) t);
@@ -67,4 +73,20 @@ public class RaftGrpcUtil {
     return e;
   }
 
+  static void asyncCall(
+      StreamObserver<RaftClientReplyProto> responseObserver,
+      CheckedSupplier<CompletableFuture<RaftClientReply>, IOException> supplier) {
+    try {
+      supplier.get().whenCompleteAsync((reply, exception) -> {
+        if (exception != null) {
+          responseObserver.onError(RaftGrpcUtil.wrapException(exception));
+        } else {
+          responseObserver.onNext(ClientProtoUtils.toRaftClientReplyProto(reply));
+          responseObserver.onCompleted();
+        }
+      });
+    } catch (Exception e) {
+      responseObserver.onError(RaftGrpcUtil.wrapException(e));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index b28415c..b30640b 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -22,6 +22,7 @@ import org.apache.ratis.grpc.RaftGrpcUtil;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
 import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.shaded.proto.RaftProtos;
 import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
 import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
@@ -48,7 +49,11 @@ public class GrpcClientRpc implements RaftClientRpc {
       throws IOException {
     final RaftPeerId serverId = request.getServerId();
     final RaftClientProtocolClient proxy = proxies.getProxy(serverId);
-    if (request instanceof SetConfigurationRequest) {
+    if (request instanceof ReinitializeRequest) {
+      RaftProtos.ReinitializeRequestProto proto =
+          toReinitializeRequestProto((ReinitializeRequest) request);
+      return toRaftClientReply(proxy.reinitialize(proto));
+    } else if (request instanceof SetConfigurationRequest) {
       SetConfigurationRequestProto setConf =
           toSetConfigurationRequestProto((SetConfigurationRequest) request);
       return toRaftClientReply(proxy.setConfiguration(setConf));

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
index 74fb253..21254f3 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
@@ -17,18 +17,22 @@
  */
 package org.apache.ratis.grpc.client;
 
+import org.apache.ratis.grpc.RaftGrpcUtil;
+import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.shaded.io.grpc.ManagedChannel;
 import org.apache.ratis.shaded.io.grpc.ManagedChannelBuilder;
 import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
 import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
 import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
 import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.ReinitializeRequestProto;
 import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
+import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc;
+import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc.AdminProtocolServiceBlockingStub;
 import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc;
 import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceBlockingStub;
 import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceStub;
-import org.apache.ratis.grpc.RaftGrpcUtil;
-import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.util.CheckedSupplier;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -38,6 +42,7 @@ public class RaftClientProtocolClient implements Closeable {
   private final ManagedChannel channel;
   private final RaftClientProtocolServiceBlockingStub blockingStub;
   private final RaftClientProtocolServiceStub asyncStub;
+  private final AdminProtocolServiceBlockingStub adminBlockingStub;
 
   public RaftClientProtocolClient(RaftPeer target) {
     this.target = target;
@@ -45,6 +50,7 @@ public class RaftClientProtocolClient implements Closeable {
         .usePlaintext(true).build();
     blockingStub = RaftClientProtocolServiceGrpc.newBlockingStub(channel);
     asyncStub = RaftClientProtocolServiceGrpc.newStub(channel);
+    adminBlockingStub = AdminProtocolServiceGrpc.newBlockingStub(channel);
   }
 
   @Override
@@ -52,12 +58,22 @@ public class RaftClientProtocolClient implements Closeable {
     channel.shutdownNow();
   }
 
-  public RaftClientReplyProto setConfiguration(
+  RaftClientReplyProto reinitialize(
+      ReinitializeRequestProto request) throws IOException {
+    return blockingCall(() -> adminBlockingStub.reinitialize(request));
+  }
+
+  RaftClientReplyProto setConfiguration(
       SetConfigurationRequestProto request) throws IOException {
+    return blockingCall(() -> blockingStub.setConfiguration(request));
+  }
+
+  private static RaftClientReplyProto blockingCall(
+      CheckedSupplier<RaftClientReplyProto, StatusRuntimeException> supplier
+      ) throws IOException {
     try {
-      return blockingStub.setConfiguration(request);
+      return supplier.get();
     } catch (StatusRuntimeException e) {
-      // unwrap StatusRuntimeException
       throw RaftGrpcUtil.unwrapException(e);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
index 97e32c1..e11a9cf 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
@@ -17,16 +17,17 @@
  */
 package org.apache.ratis.grpc.client;
 
+import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.grpc.RaftGrpcUtil;
+import org.apache.ratis.protocol.RaftClientAsynchronousProtocol;
+import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.SetConfigurationRequest;
 import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
 import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
 import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
 import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceImplBase;
-import org.apache.ratis.client.impl.ClientProtoUtils;
-import org.apache.ratis.grpc.RaftGrpcUtil;
-import org.apache.ratis.protocol.RaftClientAsynchronousProtocol;
-import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,22 +75,10 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase
   }
 
   @Override
-  public void setConfiguration(SetConfigurationRequestProto request,
+  public void setConfiguration(SetConfigurationRequestProto proto,
       StreamObserver<RaftClientReplyProto> responseObserver) {
-    try {
-      CompletableFuture<RaftClientReply> future = protocol.setConfigurationAsync(
-          ClientProtoUtils.toSetConfigurationRequest(request));
-      future.whenCompleteAsync((reply, exception) -> {
-        if (exception != null) {
-          responseObserver.onError(RaftGrpcUtil.wrapException(exception));
-        } else {
-          responseObserver.onNext(ClientProtoUtils.toRaftClientReplyProto(reply));
-          responseObserver.onCompleted();
-        }
-      });
-    } catch (Exception e) {
-      responseObserver.onError(RaftGrpcUtil.wrapException(e));
-    }
+    final SetConfigurationRequest request = ClientProtoUtils.toSetConfigurationRequest(proto);
+    RaftGrpcUtil.asyncCall(responseObserver, () -> protocol.setConfigurationAsync(request));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java
new file mode 100644
index 0000000..4e7ff9a
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/AdminProtocolService.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.server;
+
+import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.grpc.RaftGrpcUtil;
+import org.apache.ratis.protocol.AdminAsynchronousProtocol;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.ReinitializeRequest;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.ReinitializeRequestProto;
+import org.apache.ratis.shaded.proto.grpc.AdminProtocolServiceGrpc.AdminProtocolServiceImplBase;
+
+public class AdminProtocolService extends AdminProtocolServiceImplBase {
+  private final RaftPeerId id;
+  private final AdminAsynchronousProtocol protocol;
+
+  public AdminProtocolService(RaftPeerId id, AdminAsynchronousProtocol protocol) {
+    this.id = id;
+    this.protocol = protocol;
+  }
+
+  @Override
+  public void reinitialize(ReinitializeRequestProto proto,
+                           StreamObserver<RaftClientReplyProto> responseObserver) {
+    final ReinitializeRequest request = ClientProtoUtils.toReinitializeRequest(proto);
+    RaftGrpcUtil.asyncCall(responseObserver, () -> protocol.reinitializeAsync(request));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestReinitializationWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestReinitializationWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestReinitializationWithGrpc.java
new file mode 100644
index 0000000..27cbf1e
--- /dev/null
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestReinitializationWithGrpc.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.server.impl.ReinitializationBaseTest;
+
+public class TestReinitializationWithGrpc extends ReinitializationBaseTest {
+  @Override
+  public MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory() {
+    return MiniRaftClusterWithGRpc.FACTORY;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConstants.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConstants.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConstants.java
index a50b938..1f9c15d 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConstants.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConstants.java
@@ -24,6 +24,6 @@ public interface HadoopConstants {
       = "raft.client.kerberos.principal";
   String RAFT_SERVER_PROTOCOL_NAME
       = "org.apache.hadoop.raft.server.protocol.RaftServerProtocol";
-  String RAFT_CLIENT_PROTOCOL_NAME
-      = "org.apache.hadoop.raft.protocol.RaftClientProtocol";
+  String COMBINED_CLIENT_PROTOCOL_NAME
+      = "org.apache.ratis.hadooprpc.client.CombinedClientProtocol";
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocol.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocol.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocol.java
new file mode 100644
index 0000000..6281987
--- /dev/null
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocol.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc.client;
+
+import org.apache.ratis.protocol.AdminProtocol;
+import org.apache.ratis.protocol.RaftClientProtocol;
+
+public interface CombinedClientProtocol
+    extends RaftClientProtocol, AdminProtocol {
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java
new file mode 100644
index 0000000..8d1eff2
--- /dev/null
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolClientSideTranslatorPB.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc.client;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.hadooprpc.Proxy;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.ReinitializeRequest;
+import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.shaded.com.google.common.base.Function;
+import org.apache.ratis.shaded.com.google.protobuf.ServiceException;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
+import org.apache.ratis.util.CheckedFunction;
+import org.apache.ratis.util.ProtoUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+
+@InterfaceAudience.Private
+public class CombinedClientProtocolClientSideTranslatorPB
+    extends Proxy<CombinedClientProtocolPB>
+    implements CombinedClientProtocol {
+  private static final Logger LOG = LoggerFactory.getLogger(CombinedClientProtocolClientSideTranslatorPB.class);
+
+  public CombinedClientProtocolClientSideTranslatorPB(
+      String addressStr, Configuration conf) throws IOException {
+    super(CombinedClientProtocolPB.class, addressStr, conf);
+  }
+
+  @Override
+  public RaftClientReply submitClientRequest(RaftClientRequest request)
+      throws IOException {
+    return handleRequest(request, ClientProtoUtils::toRaftClientRequestProto,
+        p -> getProtocol().submitClientRequest(null, p));
+  }
+
+  @Override
+  public RaftClientReply setConfiguration(SetConfigurationRequest request)
+      throws IOException {
+    return handleRequest(request, ClientProtoUtils::toSetConfigurationRequestProto,
+        p -> getProtocol().setConfiguration(null, p));
+  }
+
+  @Override
+  public RaftClientReply reinitialize(ReinitializeRequest request) throws IOException {
+    return handleRequest(request, ClientProtoUtils::toReinitializeRequestProto,
+      p -> getProtocol().reinitialize(null, p));
+  }
+
+  static <REQUEST extends RaftClientRequest, PROTO> RaftClientReply handleRequest(
+      REQUEST request, Function<REQUEST, PROTO> toProto,
+      CheckedFunction<PROTO, RaftClientReplyProto, ServiceException> handler)
+      throws IOException {
+    final PROTO proto = toProto.apply(request);
+    try {
+      final RaftClientReplyProto reply = handler.apply(proto);
+      return ClientProtoUtils.toRaftClientReply(reply);
+    } catch (ServiceException se) {
+      LOG.trace("Failed to handle " + request, se);
+      throw ProtoUtils.toIOException(se);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolPB.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolPB.java
new file mode 100644
index 0000000..e9af3b0
--- /dev/null
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolPB.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc.client;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.ratis.hadooprpc.HadoopConstants;
+import org.apache.ratis.shaded.proto.hadoop.HadoopProtos.CombinedClientProtocolService;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+@KerberosInfo(
+    serverPrincipal = HadoopConstants.RAFT_SERVER_KERBEROS_PRINCIPAL_KEY,
+    clientPrincipal = HadoopConstants.RAFT_CLIENT_KERBEROS_PRINCIPAL_KEY)
+@ProtocolInfo(
+    protocolName = HadoopConstants.COMBINED_CLIENT_PROTOCOL_NAME,
+    protocolVersion = 1)
+public interface CombinedClientProtocolPB extends
+    CombinedClientProtocolService.BlockingInterface {
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java
new file mode 100644
index 0000000..ef9e733
--- /dev/null
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/CombinedClientProtocolServerSideTranslatorPB.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.shaded.com.google.protobuf.RpcController;
+import org.apache.ratis.shaded.com.google.protobuf.ServiceException;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.ReinitializeRequestProto;
+
+@InterfaceAudience.Private
+public class CombinedClientProtocolServerSideTranslatorPB
+    implements CombinedClientProtocolPB {
+  private final RaftServer impl;
+
+  public CombinedClientProtocolServerSideTranslatorPB(RaftServer impl) {
+    this.impl = impl;
+  }
+
+  @Override
+  public RaftClientReplyProto submitClientRequest(
+      RpcController unused, RaftClientRequestProto proto)
+      throws ServiceException {
+    final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(proto);
+    try {
+      final RaftClientReply reply = impl.submitClientRequest(request);
+      return ClientProtoUtils.toRaftClientReplyProto(reply);
+    } catch(IOException ioe) {
+      throw new ServiceException(ioe);
+    }
+  }
+
+  @Override
+  public RaftClientReplyProto setConfiguration(
+      RpcController unused, SetConfigurationRequestProto proto)
+      throws ServiceException {
+    final SetConfigurationRequest request;
+    try {
+      request = ClientProtoUtils.toSetConfigurationRequest(proto);
+      final RaftClientReply reply = impl.setConfiguration(request);
+      return ClientProtoUtils.toRaftClientReplyProto(reply);
+    } catch(IOException ioe) {
+      throw new ServiceException(ioe);
+    }
+  }
+
+  @Override
+  public RaftClientReplyProto reinitialize(
+      RpcController controller, ReinitializeRequestProto proto)
+      throws ServiceException {
+    final ReinitializeRequest request;
+    try {
+      request = ClientProtoUtils.toReinitializeRequest(proto);
+      final RaftClientReply reply = impl.reinitialize(request);
+      return ClientProtoUtils.toRaftClientReplyProto(reply);
+    } catch(IOException ioe) {
+      throw new ServiceException(ioe);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java
index 25c0ecd..3a2d6fc 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRpc.java
@@ -27,21 +27,23 @@ import java.io.IOException;
 
 public class HadoopClientRpc implements RaftClientRpc {
 
-  private final PeerProxyMap<RaftClientProtocolClientSideTranslatorPB> proxies;
+  private final PeerProxyMap<CombinedClientProtocolClientSideTranslatorPB> proxies;
 
   public HadoopClientRpc(final Configuration conf) {
     this.proxies  = new PeerProxyMap<>(
-        p -> new RaftClientProtocolClientSideTranslatorPB(p.getAddress(), conf));
+        p -> new CombinedClientProtocolClientSideTranslatorPB(p.getAddress(), conf));
   }
 
   @Override
   public RaftClientReply sendRequest(RaftClientRequest request)
       throws IOException {
     final RaftPeerId serverId = request.getServerId();
-    final RaftClientProtocolClientSideTranslatorPB proxy =
+    final CombinedClientProtocolClientSideTranslatorPB proxy =
         proxies.getProxy(serverId);
     try {
-      if (request instanceof SetConfigurationRequest) {
+      if (request instanceof ReinitializeRequest) {
+        return proxy.reinitialize((ReinitializeRequest) request);
+      } else if (request instanceof SetConfigurationRequest) {
         return proxy.setConfiguration((SetConfigurationRequest) request);
       } else {
         return proxy.submitClientRequest(request);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolClientSideTranslatorPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolClientSideTranslatorPB.java
deleted file mode 100644
index a5c1a13..0000000
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolClientSideTranslatorPB.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.hadooprpc.client;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.ratis.client.impl.ClientProtoUtils;
-import org.apache.ratis.hadooprpc.Proxy;
-import org.apache.ratis.protocol.RaftClientProtocol;
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.SetConfigurationRequest;
-import org.apache.ratis.shaded.com.google.protobuf.ServiceException;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
-import org.apache.ratis.util.ProtoUtils;
-
-@InterfaceAudience.Private
-public class RaftClientProtocolClientSideTranslatorPB
-    extends Proxy<RaftClientProtocolPB>
-    implements RaftClientProtocol {
-
-  public RaftClientProtocolClientSideTranslatorPB(
-      String addressStr, Configuration conf) throws IOException {
-    super(RaftClientProtocolPB.class, addressStr, conf);
-  }
-
-  @Override
-  public RaftClientReply submitClientRequest(RaftClientRequest request)
-      throws IOException {
-    final RaftClientRequestProto p = ClientProtoUtils.toRaftClientRequestProto(request);
-    try {
-      final RaftClientReplyProto reply = getProtocol().submitClientRequest(null, p);
-      return ClientProtoUtils.toRaftClientReply(reply);
-    } catch (ServiceException se) {
-      throw ProtoUtils.toIOException(se);
-    }
-  }
-
-  @Override
-  public RaftClientReply setConfiguration(SetConfigurationRequest request)
-      throws IOException {
-    final SetConfigurationRequestProto p
-        = ClientProtoUtils.toSetConfigurationRequestProto(request);
-    try {
-      final RaftClientReplyProto reply = getProtocol().setConfiguration(null, p);
-      return ClientProtoUtils.toRaftClientReply(reply);
-    } catch (ServiceException se) {
-      throw ProtoUtils.toIOException(se);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolPB.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolPB.java
deleted file mode 100644
index 908cd99..0000000
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolPB.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.hadooprpc.client;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.ipc.ProtocolInfo;
-import org.apache.hadoop.security.KerberosInfo;
-import org.apache.ratis.hadooprpc.HadoopConstants;
-import org.apache.ratis.shaded.proto.hadoop.HadoopProtos.RaftClientProtocolService;
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-@KerberosInfo(
-    serverPrincipal = HadoopConstants.RAFT_SERVER_KERBEROS_PRINCIPAL_KEY,
-    clientPrincipal = HadoopConstants.RAFT_CLIENT_KERBEROS_PRINCIPAL_KEY)
-@ProtocolInfo(
-    protocolName = HadoopConstants.RAFT_CLIENT_PROTOCOL_NAME,
-    protocolVersion = 1)
-public interface RaftClientProtocolPB extends
-    RaftClientProtocolService.BlockingInterface {
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolServerSideTranslatorPB.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolServerSideTranslatorPB.java
deleted file mode 100644
index 08cf589..0000000
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/RaftClientProtocolServerSideTranslatorPB.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.hadooprpc.client;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.ratis.client.impl.ClientProtoUtils;
-import org.apache.ratis.protocol.RaftClientProtocol;
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.SetConfigurationRequest;
-import org.apache.ratis.shaded.com.google.protobuf.RpcController;
-import org.apache.ratis.shaded.com.google.protobuf.ServiceException;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
-import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
-
-@InterfaceAudience.Private
-public class RaftClientProtocolServerSideTranslatorPB
-    implements RaftClientProtocolPB {
-  private final RaftClientProtocol impl;
-
-  public RaftClientProtocolServerSideTranslatorPB(RaftClientProtocol impl) {
-    this.impl = impl;
-  }
-
-  @Override
-  public RaftClientReplyProto submitClientRequest(
-      RpcController unused, RaftClientRequestProto proto)
-      throws ServiceException {
-    final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(proto);
-    try {
-      final RaftClientReply reply = impl.submitClientRequest(request);
-      return ClientProtoUtils.toRaftClientReplyProto(reply);
-    } catch(IOException ioe) {
-      throw new ServiceException(ioe);
-    }
-  }
-
-  @Override
-  public RaftClientReplyProto setConfiguration(
-      RpcController unused, SetConfigurationRequestProto proto)
-      throws ServiceException {
-    final SetConfigurationRequest request;
-    try {
-      request = ClientProtoUtils.toSetConfigurationRequest(proto);
-      final RaftClientReply reply = impl.setConfiguration(request);
-      return ClientProtoUtils.toRaftClientReplyProto(reply);
-    } catch(IOException ioe) {
-      throw new ServiceException(ioe);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
index e31a03a..15afde8 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
@@ -22,9 +22,8 @@ import org.apache.hadoop.ipc.ProtobufRpcEngineShaded;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.ratis.hadooprpc.HadoopConfigKeys;
 import org.apache.ratis.hadooprpc.Proxy;
-import org.apache.ratis.hadooprpc.client.RaftClientProtocolPB;
-import org.apache.ratis.hadooprpc.client.RaftClientProtocolServerSideTranslatorPB;
-import org.apache.ratis.protocol.RaftClientProtocol;
+import org.apache.ratis.hadooprpc.client.CombinedClientProtocolPB;
+import org.apache.ratis.hadooprpc.client.CombinedClientProtocolServerSideTranslatorPB;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.SupportedRpcType;
@@ -35,7 +34,7 @@ import org.apache.ratis.shaded.com.google.protobuf.BlockingService;
 import org.apache.ratis.shaded.com.google.protobuf.ByteString;
 import org.apache.ratis.shaded.com.google.protobuf.ServiceException;
 import org.apache.ratis.shaded.proto.RaftProtos.*;
-import org.apache.ratis.shaded.proto.hadoop.HadoopProtos.RaftClientProtocolService;
+import org.apache.ratis.shaded.proto.hadoop.HadoopProtos.CombinedClientProtocolService;
 import org.apache.ratis.shaded.proto.hadoop.HadoopProtos.RaftServerProtocolService;
 import org.apache.ratis.util.CheckedFunction;
 import org.apache.ratis.util.CodeInjectionForTesting;
@@ -138,13 +137,12 @@ public class HadoopRpcService implements RaftServerRpc {
         .build();
   }
 
-  private void addRaftClientProtocol(RaftClientProtocol clientProtocol, Configuration conf) {
-    final Class<?> protocol = RaftClientProtocolPB.class;
-    RPC.setProtocolEngine(conf,protocol, ProtobufRpcEngineShaded.class);
+  private void addRaftClientProtocol(RaftServer server, Configuration conf) {
+    final Class<?> protocol = CombinedClientProtocolPB.class;
+    RPC.setProtocolEngine(conf, protocol, ProtobufRpcEngineShaded.class);
 
-    final BlockingService service
-        = RaftClientProtocolService.newReflectiveBlockingService(
-        new RaftClientProtocolServerSideTranslatorPB(clientProtocol));
+    final BlockingService service = CombinedClientProtocolService.newReflectiveBlockingService(
+        new CombinedClientProtocolServerSideTranslatorPB(server));
     ipcServer.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocol, service);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestReinitializationWithHadoopRpc.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestReinitializationWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestReinitializationWithHadoopRpc.java
new file mode 100644
index 0000000..6efb012
--- /dev/null
+++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/TestReinitializationWithHadoopRpc.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.server.impl.ReinitializationBaseTest;
+
+public class TestReinitializationWithHadoopRpc extends ReinitializationBaseTest {
+  @Override
+  public MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory() {
+    return MiniRaftClusterWithHadoopRpc.FACTORY;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
index 74afddc..14218ad 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java
@@ -23,6 +23,7 @@ import org.apache.ratis.netty.NettyRpcProxy;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.ReinitializeRequestProto;
 import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
 import org.apache.ratis.shaded.proto.netty.NettyProtos.RaftNettyServerRequestProto;
 
@@ -38,7 +39,12 @@ public class NettyClientRpc implements RaftClientRpc {
 
     final RaftNettyServerRequestProto.Builder b = RaftNettyServerRequestProto.newBuilder();
     final RaftRpcRequestProto rpcRequest;
-    if (request instanceof SetConfigurationRequest) {
+    if (request instanceof ReinitializeRequest) {
+      final ReinitializeRequestProto proto = ClientProtoUtils.toReinitializeRequestProto(
+          (ReinitializeRequest)request);
+      b.setReinitializeRequest(proto);
+      rpcRequest = proto.getRpcRequest();
+    } else if (request instanceof SetConfigurationRequest) {
       final SetConfigurationRequestProto proto = ClientProtoUtils.toSetConfigurationRequestProto(
           (SetConfigurationRequest)request);
       b.setSetConfigurationRequest(proto);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
index 9504241..b8028b6 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
@@ -200,6 +200,15 @@ public final class NettyRpcService implements RaftServerRpc {
               .setRaftClientReply(ClientProtoUtils.toRaftClientReplyProto(reply))
               .build();
         }
+        case REINITIALIZEREQUEST: {
+          final ReinitializeRequestProto request = proto.getReinitializeRequest();
+          rpcRequest = request.getRpcRequest();
+          final RaftClientReply reply = server.reinitialize(
+              ClientProtoUtils.toReinitializeRequest(request));
+          return RaftNettyServerReplyProto.newBuilder()
+              .setRaftClientReply(ClientProtoUtils.toRaftClientReplyProto(reply))
+              .build();
+        }
         case RAFTNETTYSERVERREQUEST_NOT_SET:
           throw new IllegalArgumentException("Request case not set in proto: "
               + proto.getRaftNettyServerRequestCase());

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-netty/src/test/java/org/apache/ratis/netty/TestReinitializationWithNetty.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/TestReinitializationWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/TestReinitializationWithNetty.java
new file mode 100644
index 0000000..c378749
--- /dev/null
+++ b/ratis-netty/src/test/java/org/apache/ratis/netty/TestReinitializationWithNetty.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.netty;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.server.impl.ReinitializationBaseTest;
+
+public class TestReinitializationWithNetty extends ReinitializationBaseTest {
+  @Override
+  public MiniRaftCluster.Factory<? extends MiniRaftCluster> getClusterFactory() {
+    return MiniRaftClusterWithNetty.FACTORY;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-proto-shaded/src/main/proto/GRpc.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/GRpc.proto b/ratis-proto-shaded/src/main/proto/GRpc.proto
index 267f579..599227b 100644
--- a/ratis-proto-shaded/src/main/proto/GRpc.proto
+++ b/ratis-proto-shaded/src/main/proto/GRpc.proto
@@ -43,3 +43,9 @@ service RaftServerProtocolService {
   rpc installSnapshot(stream ratis.common.InstallSnapshotRequestProto)
       returns(ratis.common.InstallSnapshotReplyProto) {}
 }
+
+service AdminProtocolService {
+  // A client-to-server RPC to reinitialize the server
+  rpc reinitialize(ratis.common.ReinitializeRequestProto)
+      returns(ratis.common.RaftClientReplyProto) {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-proto-shaded/src/main/proto/Hadoop.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Hadoop.proto b/ratis-proto-shaded/src/main/proto/Hadoop.proto
index b85b9a2..48cfbf4 100644
--- a/ratis-proto-shaded/src/main/proto/Hadoop.proto
+++ b/ratis-proto-shaded/src/main/proto/Hadoop.proto
@@ -24,12 +24,15 @@ package ratis.hadoop;
 
 import "Raft.proto";
 
-service RaftClientProtocolService {
+service CombinedClientProtocolService {
   rpc submitClientRequest(ratis.common.RaftClientRequestProto)
       returns(ratis.common.RaftClientReplyProto);
 
   rpc setConfiguration(ratis.common.SetConfigurationRequestProto)
       returns(ratis.common.RaftClientReplyProto);
+
+  rpc reinitialize(ratis.common.ReinitializeRequestProto)
+      returns(ratis.common.RaftClientReplyProto);
 }
 
 service RaftServerProtocolService {
@@ -42,3 +45,4 @@ service RaftServerProtocolService {
   rpc installSnapshot(ratis.common.InstallSnapshotRequestProto)
       returns(ratis.common.InstallSnapshotReplyProto);
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-proto-shaded/src/main/proto/Netty.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Netty.proto b/ratis-proto-shaded/src/main/proto/Netty.proto
index d1634d7..4de770f 100644
--- a/ratis-proto-shaded/src/main/proto/Netty.proto
+++ b/ratis-proto-shaded/src/main/proto/Netty.proto
@@ -35,6 +35,7 @@ message RaftNettyServerRequestProto {
     ratis.common.InstallSnapshotRequestProto installSnapshotRequest = 3;
     ratis.common.RaftClientRequestProto raftClientRequest = 4;
     ratis.common.SetConfigurationRequestProto setConfigurationRequest = 5;
+    ratis.common.ReinitializeRequestProto reinitializeRequest = 6;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-proto-shaded/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto
index f8dcf62..b53181f 100644
--- a/ratis-proto-shaded/src/main/proto/Raft.proto
+++ b/ratis-proto-shaded/src/main/proto/Raft.proto
@@ -180,3 +180,9 @@ message SetConfigurationRequestProto {
   RaftRpcRequestProto rpcRequest = 1;
   repeated RaftPeerProto peers = 2;
 }
+
+// reinitialize request
+message ReinitializeRequestProto {
+  RaftRpcRequestProto rpcRequest = 1;
+  repeated RaftPeerProto peers = 2;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index dbd32b7..0899dd1 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -19,10 +19,7 @@ package org.apache.ratis.server;
 
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.RaftClientAsynchronousProtocol;
-import org.apache.ratis.protocol.RaftClientProtocol;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.*;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.server.impl.ServerFactory;
 import org.apache.ratis.server.impl.ServerImplUtils;
@@ -35,7 +32,8 @@ import java.util.Objects;
 
 /** Raft server interface */
 public interface RaftServer extends Closeable, RpcType.Get, RaftServerProtocol,
-    RaftClientProtocol, RaftClientAsynchronousProtocol {
+    RaftClientProtocol, RaftClientAsynchronousProtocol,
+    AdminProtocol, AdminAsynchronousProtocol {
   /** @return the server ID. */
   RaftPeerId getId();
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 4c19b7e..e9784bb 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -448,8 +448,7 @@ public class LeaderState {
           }
           // the pending request handler will send NotLeaderException for
           // pending client requests when it stops
-          // TODO should close impl instead of proxy
-          server.getProxy().close();
+          server.shutdown();
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
index 82f546b..06aeb62 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
@@ -38,7 +38,6 @@ class PeerConfiguration {
       map.put(p.getId(), p);
     }
     this.peers = Collections.unmodifiableMap(map);
-    Preconditions.assertTrue(!this.peers.isEmpty());
   }
 
   Collection<RaftPeer> getPeers() {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index d62b207..9685fbc 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -445,7 +445,7 @@ public class RaftServerImpl implements RaftServerProtocol,
     return waitForReply(getId(), request, submitClientRequestAsync(request));
   }
 
-  private static RaftClientReply waitForReply(RaftPeerId id,
+  static RaftClientReply waitForReply(RaftPeerId id,
       RaftClientRequest request, CompletableFuture<RaftClientReply> future)
       throws IOException {
     try {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 36adf11..5afb737 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -26,23 +26,29 @@ import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.shaded.proto.RaftProtos.*;
 import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.JavaUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class RaftServerProxy implements RaftServer {
   public static final Logger LOG = LoggerFactory.getLogger(RaftServerProxy.class);
 
   private final RaftPeerId id;
-  private final RaftServerImpl impl;
   private final StateMachine stateMachine;
   private final RaftProperties properties;
 
   private final RaftServerRpc serverRpc;
   private final ServerFactory factory;
 
+  private volatile CompletableFuture<RaftServerImpl> impl;
+  private final AtomicReference<ReinitializeRequest> reinitializeRequest = new AtomicReference<>();
+
   RaftServerProxy(RaftPeerId id, StateMachine stateMachine,
                   RaftConfiguration raftConf, RaftProperties properties, Parameters parameters)
       throws IOException {
@@ -52,10 +58,15 @@ public class RaftServerProxy implements RaftServer {
 
     final RpcType rpcType = RaftConfigKeys.Rpc.type(properties);
     this.factory = ServerFactory.cast(rpcType.newFactory(properties, parameters));
-    this.impl = new RaftServerImpl(id, this, raftConf, properties);
+
+    this.impl = CompletableFuture.completedFuture(initImpl(raftConf));
     this.serverRpc = initRaftServerRpc(factory, this, raftConf);
   }
 
+  private RaftServerImpl initImpl(RaftConfiguration raftConf) throws IOException {
+    return new RaftServerImpl(id, this, raftConf, properties);
+  }
+
   private static RaftServerRpc initRaftServerRpc(
       ServerFactory factory, RaftServer server, RaftConfiguration raftConf) {
     final RaftServerRpc rpc = factory.newRaftServerRpc(server);
@@ -67,6 +78,11 @@ public class RaftServerProxy implements RaftServer {
   }
 
   @Override
+  public RaftPeerId getId() {
+    return id;
+  }
+
+  @Override
   public RpcType getRpcType() {
     return getFactory().getRpcType();
   }
@@ -90,24 +106,25 @@ public class RaftServerProxy implements RaftServer {
     return serverRpc;
   }
 
-  public RaftServerImpl getImpl() {
-    return impl;
+  public RaftServerImpl getImpl() throws IOException {
+    try {
+      return impl.get();
+    } catch (InterruptedException e) {
+      throw IOUtils.toInterruptedIOException(getId() + ": getImpl interrupted.", e);
+    } catch (ExecutionException e) {
+      throw IOUtils.asIOException(e);
+    }
   }
 
   @Override
   public void start() {
-    getImpl().start();
+    JavaUtils.getAndConsume(impl, RaftServerImpl::start);
     getServerRpc().start();
   }
 
   @Override
-  public RaftPeerId getId() {
-    return id;
-  }
-
-  @Override
   public void close() {
-    getImpl().shutdown();
+    JavaUtils.getAndConsume(impl, RaftServerImpl::shutdown);
     try {
       getServerRpc().close();
     } catch (IOException ignored) {
@@ -133,6 +150,46 @@ public class RaftServerProxy implements RaftServer {
     return getImpl().setConfiguration(request);
   }
 
+  @Override
+  public RaftClientReply reinitialize(ReinitializeRequest request) throws IOException {
+    return RaftServerImpl.waitForReply(getId(), request, reinitializeAsync(request));
+  }
+
+  @Override
+  public CompletableFuture<RaftClientReply> reinitializeAsync(
+      ReinitializeRequest request) throws IOException {
+    if (!reinitializeRequest.compareAndSet(null, request)) {
+      throw new IOException("Another reinitialize is already in progress.");
+    }
+    return CompletableFuture.supplyAsync(() -> {
+      try {
+        final CompletableFuture<RaftServerImpl> oldImpl = impl;
+        impl = new CompletableFuture<>();
+        JavaUtils.getAndConsume(oldImpl, RaftServerImpl::shutdown);
+
+        final RaftConfiguration newConf = RaftConfiguration.newBuilder()
+            .setConf(request.getPeersInNewConf()).build();
+        final RaftServerImpl newImpl;
+        try {
+          newImpl = initImpl(newConf);
+        } catch (IOException ioe) {
+          final RaftException re = new RaftException(
+              "Failed to reinitialize, request=" + request, ioe);
+          impl.completeExceptionally(new IOException(
+              "Server " + getId() + " is not initialized.", re));
+          return new RaftClientReply(request, re);
+        }
+
+        getServerRpc().addPeers(newConf.getPeers());
+        newImpl.start();
+        impl.complete(newImpl);
+        return new RaftClientReply(request, (Message) null);
+      } finally {
+        reinitializeRequest.set(null);
+      }
+    });
+  }
+
   /**
    * Handle a raft configuration change request from client.
    */
@@ -162,6 +219,10 @@ public class RaftServerProxy implements RaftServer {
 
   @Override
   public String toString() {
-    return getClass().getSimpleName() + ":" + getId().toString();
+    try {
+      return getImpl().toString();
+    } catch (IOException ignored) {
+      return getClass().getSimpleName() + ":" + getId();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index ef0e454..83fcf54 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -32,7 +32,6 @@ import org.apache.ratis.server.storage.RaftLog;
 import org.apache.ratis.statemachine.BaseStateMachine;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.util.*;
-import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -335,7 +334,7 @@ public abstract class MiniRaftCluster {
 
   public RaftServerImpl getLeader() {
     final List<RaftServerImpl> leaders = new ArrayList<>();
-    getServersAliveStream()
+    getServerAliveStream()
         .filter(RaftServerImpl::isLeader)
         .forEach(s -> {
       if (leaders.isEmpty()) {
@@ -353,9 +352,9 @@ public abstract class MiniRaftCluster {
     });
     if (leaders.isEmpty()) {
       return null;
-    } else if (leaders.size() != 1) {
-      Assert.fail(printServers() + leaders.toString()
-          + "leaders.size() = " + leaders.size() + " != 1");
+    } else if (leaders.size() > 1) {
+      throw new IllegalStateException(printServers() + leaders
+          + ", leaders.size() = " + leaders.size() + " > 1");
     }
     return leaders.get(0);
   }
@@ -366,7 +365,7 @@ public abstract class MiniRaftCluster {
   }
 
   public List<RaftServerImpl> getFollowers() {
-    return getServersAliveStream()
+    return getServerAliveStream()
         .filter(RaftServerImpl::isFollower)
         .collect(Collectors.toList());
   }
@@ -376,13 +375,14 @@ public abstract class MiniRaftCluster {
   }
 
   public Iterable<RaftServerImpl> iterateServerImpls() {
-    return CollectionUtils.as(getServers(), RaftServerProxy::getImpl);
+    return CollectionUtils.as(getServers(), RaftTestUtil::getImplAsUnchecked);
   }
 
-  public Stream<RaftServerImpl> getServersAliveStream() {
-    return getServers().stream()
-        .map(RaftServerProxy::getImpl)
-        .filter(RaftServerImpl::isAlive);
+  public static Stream<RaftServerImpl> getServerStream(Collection<RaftServerProxy> servers) {
+    return servers.stream().map(RaftTestUtil::getImplAsUnchecked);
+  }
+  public Stream<RaftServerImpl> getServerAliveStream() {
+    return getServerStream(getServers()).filter(RaftServerImpl::isAlive);
   }
 
   public RaftServerProxy getServer(RaftPeerId id) {
@@ -408,7 +408,7 @@ public abstract class MiniRaftCluster {
 
   public void shutdown() {
     LOG.info("Stopping " + getClass().getSimpleName());
-    getServersAliveStream().map(RaftServerImpl::getProxy).forEach(RaftServerProxy::close);
+    getServerAliveStream().map(RaftServerImpl::getProxy).forEach(RaftServerProxy::close);
 
     if (ExitUtils.isTerminated()) {
       LOG.error("Test resulted in an unexpected exit",

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/06002e67/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 9c69d03..26eda15 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -97,7 +97,7 @@ public abstract class RaftBasicTests {
     Thread.sleep(cluster.getMaxTimeout() + 100);
     LOG.info(cluster.printAllLogs());
 
-    cluster.getServersAliveStream()
+    cluster.getServerAliveStream()
         .map(s -> s.getState().getLog())
         .forEach(log -> RaftTestUtil.assertLogEntries(log,
             log.getEntries(1, Long.MAX_VALUE), 1, term, messages));