You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ms...@apache.org on 2018/09/03 09:01:18 UTC

incubator-ratis git commit: RATIS-300. Support multiple RaftServerImpl in RaftServerProxy. Contributed by Tsz Wo Nicholas Sze.

Repository: incubator-ratis
Updated Branches:
  refs/heads/master c6c9ddf4d -> acd507e6e


RATIS-300. Support multiple RaftServerImpl in RaftServerProxy. Contributed by Tsz Wo Nicholas Sze.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/acd507e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/acd507e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/acd507e6

Branch: refs/heads/master
Commit: acd507e6e2a5520682aa6ba37c5042a377f01cd0
Parents: c6c9ddf
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Mon Sep 3 14:30:58 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Mon Sep 3 14:30:58 2018 +0530

----------------------------------------------------------------------
 .../ratis/protocol/AlreadyClosedException.java  |  27 ++
 .../java/org/apache/ratis/util/IOUtils.java     |  18 +-
 .../java/org/apache/ratis/util/JavaUtils.java   |  17 +-
 .../java/org/apache/ratis/util/LifeCycle.java   |   3 +-
 .../org/apache/ratis/util/Preconditions.java    |   8 +
 .../org/apache/ratis/TestRestartRaftPeer.java   |   4 +-
 .../ratis/examples/ParameterizedBaseTest.java   |   2 +-
 .../org/apache/ratis/grpc/RaftGRpcService.java  |  14 +-
 .../ratis/grpc/server/GRpcLogAppender.java      |   2 +-
 .../ratis/grpc/TestRaftServerWithGrpc.java      |   4 +-
 .../hadooprpc/server/HadoopRpcService.java      |   2 +-
 .../ratis/netty/server/NettyRpcService.java     |   2 +-
 .../org/apache/ratis/server/RaftServer.java     |   5 +-
 .../org/apache/ratis/server/RaftServerRpc.java  |   3 +-
 .../ratis/server/impl/RaftServerImpl.java       |  20 +-
 .../ratis/server/impl/RaftServerProxy.java      | 301 +++++++++++++------
 .../server/impl/RaftServerRpcWithProxy.java     |  11 +-
 .../org/apache/ratis/server/impl/RoleInfo.java  |   5 +
 .../ratis/server/impl/ServerImplUtils.java      |  23 +-
 .../apache/ratis/server/storage/RaftLog.java    |   2 +-
 .../java/org/apache/ratis/MiniRaftCluster.java  |  79 ++---
 .../java/org/apache/ratis/RaftAsyncTests.java   |  18 +-
 .../java/org/apache/ratis/RaftBasicTests.java   |   4 +-
 .../java/org/apache/ratis/RaftTestUtil.java     |  13 +-
 .../TestRaftServerLeaderElectionTimeout.java    |   3 +-
 .../ratis/TestRaftServerSlownessDetection.java  |   2 +-
 .../ratis/server/impl/LeaderElectionTests.java  |   5 +-
 .../impl/RaftReconfigurationBaseTest.java       |   3 +-
 .../ratis/server/impl/RaftServerTestUtil.java   |  10 +
 .../server/impl/ReinitializationBaseTest.java   |  11 +-
 .../server/simulation/SimulatedServerRpc.java   |  10 +-
 .../statemachine/RaftSnapshotBaseTest.java      |   3 +-
 .../ratis/statemachine/TestStateMachine.java    |  28 +-
 33 files changed, 426 insertions(+), 236 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyClosedException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyClosedException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyClosedException.java
new file mode 100644
index 0000000..85888a0
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyClosedException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+/**
+ * The corresponding object is already closed.
+ */
+public class AlreadyClosedException extends RaftException {
+  public AlreadyClosedException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
index 915f4a2..c560bc6 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
@@ -20,6 +20,8 @@
 
 package org.apache.ratis.util;
 
+import org.slf4j.Logger;
+
 import java.io.Closeable;
 import java.io.EOFException;
 import java.io.IOException;
@@ -30,10 +32,10 @@ import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.FileChannel;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 
-import org.slf4j.Logger;
-
 /**
  * IO related utility methods.
  */
@@ -56,6 +58,18 @@ public interface IOUtils {
     return cause != null? asIOException(cause): new IOException(e);
   }
 
+  static <T> T getFromFuture(CompletableFuture<T> future, Object name) throws IOException {
+    try {
+      return future.get();
+    } catch (InterruptedException e) {
+      throw toInterruptedIOException(name + " interrupted.", e);
+    } catch (ExecutionException e) {
+      throw toIOException(e);
+    } catch (CompletionException e) {
+      throw asIOException(JavaUtils.unwrapCompletionException(e));
+    }
+  }
+
   static boolean shouldReconnect(Exception e) {
     return ReflectionUtils.isInstance(e,
         SocketException.class, SocketTimeoutException.class, ClosedChannelException.class, EOFException.class);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index 15cb4f6..dd8eb39 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -39,6 +39,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BooleanSupplier;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
 /**
@@ -67,12 +68,20 @@ public interface JavaUtils {
    * wrap the checked exception by {@link RuntimeException}.
    */
   static <T> T callAsUnchecked(Callable<T> callable) {
+    return callAsUnchecked(callable::call, RuntimeException::new);
+  }
+
+  static <OUTPUT, THROWABLE extends Throwable> OUTPUT callAsUnchecked(
+      CheckedSupplier<OUTPUT, THROWABLE> checkedSupplier,
+      Function<THROWABLE, ? extends RuntimeException> converter) {
     try {
-      return callable.call();
-    } catch (RuntimeException e) {
+      return checkedSupplier.get();
+    } catch(RuntimeException | Error e) {
       throw e;
-    } catch (Exception e) {
-      throw new RuntimeException(e);
+    } catch(Throwable t) {
+      @SuppressWarnings("unchecked")
+      final THROWABLE casted = (THROWABLE)t;
+      throw converter.apply(casted);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java b/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
index 93af1ef..f8f3648 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
@@ -158,12 +158,13 @@ public class LifeCycle {
   }
 
   /** Assert if the current state equals to one of the expected states. */
-  public <T extends Throwable> void assertCurrentState(
+  public <T extends Throwable> State assertCurrentState(
       BiFunction<String, State, T> newThrowable, State... expected) throws T {
     final State c = getCurrentState();
     if (!c.isOneOf(expected)) {
       throw newThrowable.apply(name, c);
     }
+    return c;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
index f1d55b0..7af2201 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
@@ -72,4 +72,12 @@ public interface Preconditions {
       throw new IllegalStateException(String.valueOf(message.get()));
     }
   }
+
+  static void assertNull(Object object, String name) {
+    if (object != null) {
+      throw new IllegalStateException(
+          name + " is expected to be null but "
+              + name + " = " + object + " != null, class = " + object.getClass());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java b/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java
index 4b7ed40..ccbbda0 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java
@@ -93,14 +93,14 @@ public class TestRestartRaftPeer extends BaseTest {
     long lastAppliedIndex = 0;
     for (int i = 0; i < 10 && !catchup; i++) {
       Thread.sleep(500);
-      lastAppliedIndex = cluster.getServer(followerId).getImpl().getState().getLastAppliedIndex();
+      lastAppliedIndex = cluster.getRaftServerImpl(followerId).getState().getLastAppliedIndex();
       catchup = lastAppliedIndex >= 20;
     }
     Assert.assertTrue("lastAppliedIndex=" + lastAppliedIndex, catchup);
 
     // make sure the restarted peer's log segments is correct
     cluster.restartServer(followerId, false);
-    Assert.assertTrue(cluster.getServer(followerId).getImpl().getState().getLog()
+    Assert.assertTrue(cluster.getRaftServerImpl(followerId).getState().getLog()
         .getLastEntryTermIndex().getIndex() >= 20);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java b/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java
index 3ee5e05..057c73a 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java
@@ -51,7 +51,7 @@ public abstract class ParameterizedBaseTest extends BaseTest {
   private static final AtomicReference<MiniRaftCluster> currentCluster = new AtomicReference<>();
 
   /** Set {@link #currentCluster} to the given cluster and start it if {@link #currentCluster} is changed. */
-  public static void setAndStart(MiniRaftCluster cluster) throws InterruptedException {
+  public static void setAndStart(MiniRaftCluster cluster) throws InterruptedException, IOException {
     final MiniRaftCluster previous = currentCluster.getAndSet(cluster);
     if (previous != cluster) {
       if (previous != null) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
index fefec48..b3e514c 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
@@ -111,11 +111,17 @@ public class RaftGRpcService extends RaftServerRpcWithProxy<RaftServerProtocolCl
   }
 
   @Override
-  public void closeImpl() {
-    if (server != null) {
-      server.shutdown();
-    }
+  public void closeImpl() throws IOException {
+    final String name = getId() + ": shutdown server with port " + server.getPort();
+    LOG.info("{} now", name);
+    final Server s = server.shutdownNow();
     super.closeImpl();
+    try {
+      s.awaitTermination();
+    } catch(InterruptedException e) {
+      throw IOUtils.toInterruptedIOException(name + " failed", e);
+    }
+    LOG.info("{} successfully", name);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
index 595e061..f060c24 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
@@ -303,7 +303,7 @@ public class GRpcLogAppender extends LogAppender {
     AppendEntriesRequestProto request = pendingRequests.remove(reply.getServerReply().getCallId());
     if (request == null) {
       // If reply comes after timeout, the reply is ignored.
-      LOG.warn("Ignoring reply: " + reply);
+      LOG.warn("{}: Ignoring {}", server.getId(), reply);
       return;
     }
     Preconditions.assertTrue(request.hasPreviousLog());

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index e5e95ee..7173e1f 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -51,8 +51,8 @@ public class TestRaftServerWithGrpc extends BaseTest {
     // the raft server proxy created earlier. Raft server proxy should close
     // the rpc server on failure.
     testFailureCase("start a new server with the same address",
-        () -> ServerImplUtils.newRaftServer(leaderId, cluster.getGroup(), stateMachine, properties, null),
-        IOException.class, IOException.class, OverlappingFileLockException.class);
+        () -> ServerImplUtils.newRaftServer(leaderId, cluster.getGroup(), stateMachine, properties, null).start(),
+        IOException.class, OverlappingFileLockException.class);
     // Try to start a raft server rpc at the leader address.
     cluster.getServer(leaderId).getFactory().newRaftServerRpc(cluster.getServer(leaderId));
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
index 5e571d4..5d07752 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
@@ -149,7 +149,7 @@ public class HadoopRpcService extends RaftServerRpcWithProxy<Proxy<RaftServerPro
   }
 
   @Override
-  public void closeImpl() {
+  public void closeImpl() throws IOException {
     ipcServer.stop();
     super.closeImpl();
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
index ebe3184..45c0b77 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
@@ -134,7 +134,7 @@ public final class NettyRpcService extends RaftServerRpcWithProxy<NettyRpcProxy,
   }
 
   @Override
-  public void closeImpl() {
+  public void closeImpl() throws IOException {
     bossGroup.shutdownGracefully();
     workerGroup.shutdownGracefully();
     final ChannelFuture f = getChannel().close();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index acd54cb..e071d4b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -26,6 +26,7 @@ import org.apache.ratis.server.impl.ServerImplUtils;
 import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
 import org.apache.ratis.server.protocol.RaftServerProtocol;
 import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.LifeCycle;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -50,7 +51,9 @@ public interface RaftServer extends Closeable, RpcType.Get,
   ServerFactory getFactory();
 
   /** Start this server. */
-  void start();
+  void start() throws IOException;
+
+  LifeCycle.State getLifeCycleState();
 
   /** @return a {@link Builder}. */
   static Builder newBuilder() {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
index 33db36d..a85e606 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
@@ -23,6 +23,7 @@ import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.protocol.RaftServerProtocol;
 
 import java.io.Closeable;
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Objects;
 
@@ -51,7 +52,7 @@ public interface RaftServerRpc extends RaftServerProtocol, RpcType.Get, Closeabl
   }
 
   /** Start the RPC service. */
-  void start();
+  void start() throws IOException;
 
   /** @return the address where this RPC server is listening to. */
   InetSocketAddress getInetSocketAddress();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index fd0ee4a..e33177a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -166,8 +166,11 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
     this.role.transitionRole(newRole);
   }
 
-  void start() {
-    lifeCycle.transition(STARTING);
+  boolean start() {
+    if (!lifeCycle.compareAndTransition(NEW, STARTING)) {
+      return false;
+    }
+    LOG.info("{}: start {}", getId(), groupId);
     state.start();
     RaftConfiguration conf = getRaftConf();
     if (conf != null && conf.contains(getId())) {
@@ -179,6 +182,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
     }
 
     registerMBean(getId(), getGroupId(), jmxAdapter, jmxAdapter);
+    return true;
   }
 
   static boolean registerMBean(
@@ -235,6 +239,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
 
   void shutdown() {
     lifeCycle.checkStateAndClose(() -> {
+      LOG.info("{}: shutdown {}", getId(), groupId);
       try {
         jmxAdapter.unregister();
       } catch (Exception ignored) {
@@ -490,8 +495,8 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
         peers.toArray(new RaftPeer[peers.size()]));
   }
 
-  private void assertLifeCycleState(LifeCycle.State... expected) throws ServerNotReadyException {
-    lifeCycle.assertCurrentState((n, c) -> new ServerNotReadyException("Server " + n
+  private LifeCycle.State assertLifeCycleState(LifeCycle.State... expected) throws ServerNotReadyException {
+    return lifeCycle.assertCurrentState((n, c) -> new ServerNotReadyException("Server " + n
         + " is not " + Arrays.toString(expected) + ": current state is " + c),
         expected);
   }
@@ -866,7 +871,12 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
             + ", commits" + ProtoUtils.toString(commitInfos)
             + ", entries: " + ServerProtoUtils.toString(entries));
 
-    assertLifeCycleState(STARTING, RUNNING);
+    final LifeCycle.State currentState = assertLifeCycleState(STARTING, RUNNING);
+    if (currentState == STARTING) {
+      if (role.getCurrentRole() == null) {
+        throw new ServerNotReadyException("The role of Server " + getId() + " is not yet initialized.");
+      }
+    }
     assertGroup(leaderId, leaderGroupId);
 
     try {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 9db7735..27ec67f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -26,35 +26,132 @@ import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.shaded.proto.RaftProtos.*;
 import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.CheckedFunction;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ProtoUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.Collections;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 public class RaftServerProxy implements RaftServer {
   public static final Logger LOG = LoggerFactory.getLogger(RaftServerProxy.class);
 
+  /**
+   * A map: {@link RaftGroupId} -> {@link RaftServerImpl} futures.
+   *
+   * The map is synchronized for mutations and the bulk {@link #getAll()} method
+   * but the (non-bulk) {@link #get(RaftGroupId)} and {@link #containsGroup(RaftGroupId)} methods are not.
+   * The thread safety and atomicity guarantees for the non-bulk methods are provided by {@link ConcurrentMap}.
+   */
+  class ImplMap implements Closeable {
+    private final ConcurrentMap<RaftGroupId, CompletableFuture<RaftServerImpl>> map = new ConcurrentHashMap<>();
+    private boolean isClosed = false;
+
+    synchronized CompletableFuture<RaftServerImpl> addNew(RaftGroup group) {
+      if (isClosed) {
+        return JavaUtils.completeExceptionally(new AlreadyClosedException(
+            getId() + ": Failed to add " + group + " since the server is already closed"));
+      }
+      if (containsGroup(group.getGroupId())) {
+        return JavaUtils.completeExceptionally(new AlreadyExistsException(
+            getId() + ": Failed to add " + group + " since the group already exists in the map."));
+      }
+      final RaftGroupId groupId = group.getGroupId();
+      final CompletableFuture<RaftServerImpl> newImpl = newRaftServerImpl(group);
+      final CompletableFuture<RaftServerImpl> previous = map.put(groupId, newImpl);
+      Preconditions.assertNull(previous, "previous");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("{}: addNew {} returns {}", getId(), group, toString(groupId, newImpl));
+      }
+      return newImpl;
+    }
+
+    synchronized CompletableFuture<RaftServerImpl> remove(RaftGroupId groupId) {
+      final CompletableFuture<RaftServerImpl> future = map.remove(groupId);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("{}: remove {}", getId(), toString(groupId, future));
+      }
+      return future;
+    }
+
+    @Override
+    public synchronized void close() {
+      if (isClosed) {
+        LOG.info("{} is already closed.", getId());
+        return;
+      }
+      isClosed = true;
+      map.values().parallelStream().map(CompletableFuture::join).forEach(RaftServerImpl::shutdown);
+    }
+
+    synchronized List<CompletableFuture<RaftServerImpl>> getAll() {
+      return new ArrayList<>(map.values());
+    }
+
+    CompletableFuture<RaftServerImpl> get(RaftGroupId groupId) {
+      final CompletableFuture<RaftServerImpl> i = map.get(groupId);
+      if (i == null) {
+        return JavaUtils.completeExceptionally(new GroupMismatchException(
+            getId() + ": " + groupId + " not found."));
+      }
+      return i;
+    }
+
+    boolean containsGroup(RaftGroupId groupId) {
+      return map.containsKey(groupId);
+    }
+
+    @Override
+    public synchronized String toString() {
+      if (map.isEmpty()) {
+        return "<EMPTY>";
+      } else if (map.size() == 1) {
+        return toString(map.entrySet().iterator().next());
+      }
+      final StringBuilder b = new StringBuilder("[");
+      map.entrySet().forEach(e -> b.append("\n  ").append(toString(e)));
+      return b.append("] size=").append(map.size()).toString();
+    }
+
+    String toString(Map.Entry<RaftGroupId, CompletableFuture<RaftServerImpl>> e) {
+      return toString(e.getKey(), e.getValue());
+    }
+
+    String toString(RaftGroupId groupId, CompletableFuture<RaftServerImpl> f) {
+      return "" + (f != null && f.isDone()? f.join(): groupId + ":" + f);
+    }
+  }
+
   private final RaftPeerId id;
   private final RaftProperties properties;
   private final StateMachine.Registry stateMachineRegistry;
+  private final LifeCycle lifeCycle;
 
   private final RaftServerRpc serverRpc;
   private final ServerFactory factory;
 
-  private volatile CompletableFuture<RaftServerImpl> impl;
+  private final ImplMap impls = new ImplMap();
   private final AtomicReference<ReinitializeRequest> reinitializeRequest = new AtomicReference<>();
 
   RaftServerProxy(RaftPeerId id, StateMachine.Registry stateMachineRegistry,
-      RaftGroup group, RaftProperties properties, Parameters parameters)
-      throws IOException {
+      RaftProperties properties, Parameters parameters) {
     this.properties = properties;
     this.stateMachineRegistry = stateMachineRegistry;
 
@@ -63,23 +160,18 @@ public class RaftServerProxy implements RaftServer {
 
     this.serverRpc = factory.newRaftServerRpc(this);
     this.id = id != null? id: RaftPeerId.valueOf(getIdStringFrom(serverRpc));
+    this.lifeCycle = new LifeCycle(this.id);
+  }
 
-    try {
-      this.impl = CompletableFuture.completedFuture(initImpl(group));
-    } catch (IOException ioe) {
+  private CompletableFuture<RaftServerImpl> newRaftServerImpl(RaftGroup group) {
+    return CompletableFuture.supplyAsync(() -> {
       try {
-        serverRpc.close();
-      } catch (IOException closeIoe) {
-        LOG.warn(this.id + ": Failed to close server rpc.", closeIoe);
-        ioe.addSuppressed(closeIoe);
-      } finally {
-        throw ioe;
+        serverRpc.addPeers(group.getPeers());
+        return new RaftServerImpl(group, stateMachineRegistry.apply(group.getGroupId()), this);
+      } catch(IOException e) {
+        throw new CompletionException(getId() + ": Failed to initialize server for " + group, e);
       }
-    }
-  }
-
-  private RaftServerImpl initImpl(RaftGroup group) throws IOException {
-    return new RaftServerImpl(group, stateMachineRegistry.apply(group.getGroupId()), this);
+    });
   }
 
   private static String getIdStringFrom(RaftServerRpc rpc) {
@@ -98,8 +190,9 @@ public class RaftServerProxy implements RaftServer {
     return id;
   }
 
+  @Override
   public Iterable<RaftGroupId> getGroupIds() throws IOException {
-    return Collections.singleton(getImpl().getGroupId());
+    return getImpls().stream().map(RaftServerImpl::getGroupId).collect(Collectors.toList());
   }
 
   @Override
@@ -121,54 +214,85 @@ public class RaftServerProxy implements RaftServer {
     return serverRpc;
   }
 
-  public RaftServerImpl getImpl() throws IOException {
-    final CompletableFuture<RaftServerImpl> i = impl;
-    if (i == null) {
-      throw new ServerNotReadyException(getId() + " is not initialized.");
-    }
-    try {
-      return i.get();
-    } catch (InterruptedException e) {
-      throw IOUtils.toInterruptedIOException(getId() + ": getImpl interrupted.", e);
-    } catch (ExecutionException e) {
-      throw IOUtils.asIOException(e);
+  public boolean containsGroup(RaftGroupId groupId) {
+    return impls.containsGroup(groupId);
+  }
+
+  CompletableFuture<RaftServerImpl> addGroup(RaftGroup group) {
+    return impls.addNew(group);
+  }
+
+  private CompletableFuture<RaftServerImpl> getImplFuture(RaftGroupId groupId) {
+    return impls.get(groupId);
+  }
+
+  private RaftServerImpl getImpl(RaftRpcRequestProto proto) throws IOException {
+    return getImpl(ProtoUtils.toRaftGroupId(proto.getRaftGroupId()));
+  }
+
+  RaftServerImpl getImpl(RaftGroupId groupId) throws IOException {
+    Objects.requireNonNull(groupId, "groupId == null");
+    return IOUtils.getFromFuture(getImplFuture(groupId), getId());
+  }
+
+  List<RaftServerImpl> getImpls() throws IOException {
+    final List<RaftServerImpl> list = new ArrayList<>();
+    for(CompletableFuture<RaftServerImpl> f : impls.getAll()) {
+      list.add(IOUtils.getFromFuture(f, getId()));
     }
+    return list;
   }
 
   @Override
-  public void start() {
-    LOG.info("{}: start", getId());
-    JavaUtils.getAndConsume(impl, RaftServerImpl::start);
-    getServerRpc().start();
+  public LifeCycle.State getLifeCycleState() {
+    return lifeCycle.getCurrentState();
+  }
+
+  @Override
+  public void start() throws IOException {
+    getImpls().parallelStream().forEach(RaftServerImpl::start);
+
+    lifeCycle.startAndTransition(() -> {
+      LOG.info("{}: start RPC server", getId());
+      getServerRpc().start();
+    }, IOException.class);
   }
 
   @Override
   public void close() {
-    LOG.info("{}: close", getId());
-    JavaUtils.getAndConsume(impl, RaftServerImpl::shutdown);
-    try {
-      getServerRpc().close();
-    } catch (IOException ignored) {
-      LOG.warn("Failed to close RPC server for " + getId(), ignored);
-    }
+    lifeCycle.checkStateAndClose(() -> {
+      LOG.info("{}: close", getId());
+      impls.close();
+
+      try {
+        getServerRpc().close();
+      } catch(IOException ignored) {
+        LOG.warn(getId() + ": Failed to close " + getRpcType() + " server", ignored);
+      }
+    });
+  }
+
+  private <REPLY> CompletableFuture<REPLY> submitRequest(RaftGroupId groupId,
+      CheckedFunction<RaftServerImpl, CompletableFuture<REPLY>, IOException> submitFunction) {
+    return getImplFuture(groupId).thenCompose(
+        impl -> JavaUtils.callAsUnchecked(() -> submitFunction.apply(impl), CompletionException::new));
   }
 
   @Override
-  public CompletableFuture<RaftClientReply> submitClientRequestAsync(
-      RaftClientRequest request) throws IOException {
-    return getImpl().submitClientRequestAsync(request);
+  public CompletableFuture<RaftClientReply> submitClientRequestAsync(RaftClientRequest request) {
+    return submitRequest(request.getRaftGroupId(), impl -> impl.submitClientRequestAsync(request));
   }
 
   @Override
   public RaftClientReply submitClientRequest(RaftClientRequest request)
       throws IOException {
-    return getImpl().submitClientRequest(request);
+    return getImpl(request.getRaftGroupId()).submitClientRequest(request);
   }
 
   @Override
   public RaftClientReply setConfiguration(SetConfigurationRequest request)
       throws IOException {
-    return getImpl().setConfiguration(request);
+    return getImpl(request.getRaftGroupId()).setConfiguration(request);
   }
 
   @Override
@@ -180,37 +304,29 @@ public class RaftServerProxy implements RaftServer {
   @Override
   public CompletableFuture<RaftClientReply> reinitializeAsync(
       ReinitializeRequest request) throws IOException {
-    LOG.info("{}: reinitializeAsync {}", getId(), request);
-    getImpl().assertGroup(request.getRequestorId(), request.getRaftGroupId());
+    LOG.info("{}: reinitialize* {}", getId(), request);
     if (!reinitializeRequest.compareAndSet(null, request)) {
       throw new IOException("Another reinitialize is already in progress.");
     }
-
-    return CompletableFuture.supplyAsync(() -> {
-      try {
-        final CompletableFuture<RaftServerImpl> oldImpl = impl;
-        impl = new CompletableFuture<>();
-        JavaUtils.getAndConsume(oldImpl, RaftServerImpl::shutdown);
-
-        final RaftServerImpl newImpl;
-        try {
-          newImpl = initImpl(request.getGroup());
-        } catch (IOException ioe) {
-          final RaftException re = new RaftException(
-              "Failed to reinitialize, request=" + request, ioe);
-          impl.completeExceptionally(new IOException(
-              "Server " + getId() + " is not initialized.", re));
-          return new RaftClientReply(request, re, null);
-        }
-
-        getServerRpc().addPeers(request.getGroup().getPeers());
-        newImpl.start();
-        impl.complete(newImpl);
-        return new RaftClientReply(request, newImpl.getCommitInfos());
-      } finally {
-        reinitializeRequest.set(null);
-      }
-    });
+    final RaftGroupId oldGroupId = request.getRaftGroupId();
+    return getImplFuture(oldGroupId)
+        .thenAcceptAsync(RaftServerImpl::shutdown)
+        .thenAccept(_1 -> impls.remove(oldGroupId))
+        .thenCompose(_1 -> impls.addNew(request.getGroup()))
+        .thenApply(newImpl -> {
+          LOG.debug("{}: newImpl = {}", getId(), newImpl);
+          final boolean started = newImpl.start();
+          Preconditions.assertTrue(started, () -> getId()+ ": failed to start a new impl: " + newImpl);
+          return new RaftClientReply(request, newImpl.getCommitInfos());
+        })
+        .whenComplete((_1, throwable) -> {
+          if (throwable != null) {
+            impls.remove(request.getGroup().getGroupId());
+            LOG.warn(getId() + ": Failed reinitialize* " + request, throwable);
+          }
+
+          reinitializeRequest.set(null);
+        });
   }
 
   @Override
@@ -223,48 +339,41 @@ public class RaftServerProxy implements RaftServer {
   @Override
   public CompletableFuture<ServerInformationReply> getInfoAsync(
       ServerInformationRequest request) {
-    return impl.thenApply(server -> server.getServerInformation(request));
+    return getImplFuture(request.getRaftGroupId()).thenApplyAsync(
+        server -> server.getServerInformation(request));
   }
 
   /**
    * Handle a raft configuration change request from client.
    */
   @Override
-  public CompletableFuture<RaftClientReply> setConfigurationAsync(
-      SetConfigurationRequest request) throws IOException {
-    return getImpl().setConfigurationAsync(request);
+  public CompletableFuture<RaftClientReply> setConfigurationAsync(SetConfigurationRequest request) {
+    return submitRequest(request.getRaftGroupId(), impl -> impl.setConfigurationAsync(request));
   }
 
   @Override
-  public RequestVoteReplyProto requestVote(RequestVoteRequestProto r)
-      throws IOException {
-    return getImpl().requestVote(r);
+  public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) throws IOException {
+    return getImpl(request.getServerRequest()).requestVote(request);
   }
 
   @Override
-  public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
-      AppendEntriesRequestProto r) throws IOException {
-    return getImpl().appendEntriesAsync(r);
+  public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(AppendEntriesRequestProto request) {
+    final RaftGroupId groupId = ProtoUtils.toRaftGroupId(request.getServerRequest().getRaftGroupId());
+    return submitRequest(groupId, impl -> impl.appendEntriesAsync(request));
   }
 
   @Override
-  public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r)
-      throws IOException {
-    return getImpl().appendEntries(r);
+  public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) throws IOException {
+    return getImpl(request.getServerRequest()).appendEntries(request);
   }
 
   @Override
-  public InstallSnapshotReplyProto installSnapshot(
-      InstallSnapshotRequestProto request) throws IOException {
-    return getImpl().installSnapshot(request);
+  public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) throws IOException {
+    return getImpl(request.getServerRequest()).installSnapshot(request);
   }
 
   @Override
   public String toString() {
-    try {
-      return getImpl().toString();
-    } catch (IOException ignored) {
-      return getClass().getSimpleName() + ":" + getId();
-    }
+    return getId() + String.format(":%9s ", lifeCycle.getCurrentState()) + impls;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java
index aa8e2f7..68cf6fc 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java
@@ -25,6 +25,7 @@ import org.apache.ratis.util.LifeCycle;
 import org.apache.ratis.util.PeerProxyMap;
 
 import java.io.Closeable;
+import java.io.IOException;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
@@ -64,18 +65,18 @@ public abstract class RaftServerRpcWithProxy<PROXY extends Closeable, PROXIES ex
   }
 
   @Override
-  public final void start() {
-    getLifeCycle().startAndTransition(() -> startImpl());
+  public final void start() throws IOException {
+    getLifeCycle().startAndTransition(this::startImpl, IOException.class);
   }
 
-  public abstract void startImpl();
+  public abstract void startImpl() throws IOException;
 
   @Override
-  public final void close() {
+  public final void close() throws IOException{
     getLifeCycle().checkStateAndClose(() -> closeImpl());
   }
 
-  public void closeImpl() {
+  public void closeImpl() throws IOException {
     getProxies().close();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
index e7a6e80..20ec951 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
@@ -59,4 +59,9 @@ public class RoleInfo {
   public boolean isLeader() {
     return role == RaftPeerRole.LEADER;
   }
+
+  @Override
+  public String toString() {
+    return "" + role;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index d9e0ee9..7b80dcf 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -34,27 +34,26 @@ public class ServerImplUtils {
   public static RaftServerProxy newRaftServer(
       RaftPeerId id, RaftGroup group, StateMachine stateMachine,
       RaftProperties properties, Parameters parameters) throws IOException {
-    return newRaftServer(id, group, gid -> stateMachine, properties, parameters);
+    RaftServerProxy.LOG.debug("newRaftServer: {}, {}", id, group);
+    final RaftServerProxy proxy = newRaftServer(id, gid -> stateMachine, properties, parameters);
+    if (group != null) {
+      proxy.addGroup(group);
+    }
+    return proxy;
   }
 
-  public static RaftServerProxy newRaftServer(
-      RaftPeerId id, RaftGroup group, StateMachine.Registry stateMachineRegistry,
-      RaftProperties properties, Parameters parameters) throws IOException {
+  private static RaftServerProxy newRaftServer(
+      RaftPeerId id, StateMachine.Registry stateMachineRegistry, RaftProperties properties, Parameters parameters)
+      throws IOException {
     final RaftServerProxy proxy;
     try {
       // attempt multiple times to avoid temporary bind exception
       proxy = JavaUtils.attempt(
-          () -> new RaftServerProxy(id, stateMachineRegistry, group, properties, parameters),
+          () -> new RaftServerProxy(id, stateMachineRegistry, properties, parameters),
           5, 500L, "new RaftServerProxy", RaftServerProxy.LOG);
     } catch (InterruptedException e) {
       throw IOUtils.toInterruptedIOException(
-          "Interrupted when creating RaftServer " + id + ", " + group, e);
-    } catch (IOException e) {
-      throw new IOException("Failed to create RaftServer " + id + ", " + group, e);
-    }
-    // add peers into rpc service
-    if (!group.getPeers().isEmpty()) {
-      proxy.getServerRpc().addPeers(group.getPeers());
+          "Interrupted when creating RaftServer " + id, e);
     }
     return proxy;
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
index 0d2ec4c..296dce6 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -77,7 +77,7 @@ public abstract class RaftLog implements Closeable {
 
   public void checkLogState() {
     Preconditions.assertTrue(isOpen,
-        "The RaftLog has not been opened or has been closed");
+        () -> getSelfId() + ": The RaftLog has not been opened or has been closed");
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 3806bb8..8104938 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -26,6 +26,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.RaftServerProxy;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.server.storage.MemoryRaftLog;
 import org.apache.ratis.server.storage.RaftLog;
 import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
@@ -147,6 +148,7 @@ public abstract class MiniRaftCluster {
 
   protected MiniRaftCluster(String[] ids, RaftProperties properties, Parameters parameters) {
     this.group = initRaftGroup(Arrays.asList(ids));
+    LOG.info("new MiniRaftCluster {}", group);
     this.properties = new RaftProperties(properties);
     this.parameters = parameters;
 
@@ -160,6 +162,7 @@ public abstract class MiniRaftCluster {
   }
 
   public MiniRaftCluster initServers() {
+    LOG.info("servers = " + servers);
     if (servers.isEmpty()) {
       putNewServers(CollectionUtils.as(group.getPeers(), RaftPeer::getId), true);
     }
@@ -183,7 +186,7 @@ public abstract class MiniRaftCluster {
         .collect(Collectors.toList());
   }
 
-  public void start() {
+  public void start() throws IOException {
     LOG.info(".............................................................. ");
     LOG.info("... ");
     LOG.info("...     Starting " + getClass().getSimpleName());
@@ -191,7 +194,7 @@ public abstract class MiniRaftCluster {
     LOG.info(".............................................................. ");
 
     initServers();
-    servers.values().forEach(RaftServer::start);
+    startServers(servers.values());
   }
 
   /**
@@ -201,7 +204,7 @@ public abstract class MiniRaftCluster {
     killServer(newId);
     servers.remove(newId);
 
-    startServer(putNewServer(newId, format), true);
+    putNewServer(newId, format).start();
   }
 
   public void restart(boolean format) throws IOException {
@@ -217,8 +220,8 @@ public abstract class MiniRaftCluster {
     return RaftServerConfigKeys.Rpc.timeoutMax(properties).toInt(TimeUnit.MILLISECONDS);
   }
 
-  private RaftServerProxy newRaftServer(RaftPeerId id, RaftGroup group,
-      boolean format) {
+  private RaftServerProxy newRaftServer(RaftPeerId id, RaftGroup group, boolean format) {
+    LOG.info("newRaftServer: {}, {}, format? {}", id, group, format);
     try {
       final File dir = getStorageDir(id);
       if (format) {
@@ -280,14 +283,14 @@ public abstract class MiniRaftCluster {
     return addNewPeers(generateIds(number, servers.size()), startNewPeer);
   }
 
-  public PeerChanges addNewPeers(String[] ids, boolean startNewPeer) {
+  public PeerChanges addNewPeers(String[] ids, boolean startNewPeer) throws IOException {
     LOG.info("Add new peers {}", Arrays.asList(ids));
 
     // create and add new RaftServers
     final Collection<RaftServerProxy> newServers = putNewServers(
         CollectionUtils.as(Arrays.asList(ids), RaftPeerId::valueOf), true);
 
-    newServers.forEach(s -> startServer(s, true));
+    startServers(newServers);
     if (!startNewPeer) {
       // start and then close, in order to bind the port
       newServers.forEach(p -> p.close());
@@ -301,16 +304,12 @@ public abstract class MiniRaftCluster {
     return new PeerChanges(p, np, new RaftPeer[0]);
   }
 
-  protected void startServer(RaftServer server, boolean startService) {
-    if (startService) {
-      server.start();
+  static void startServers(Iterable<? extends RaftServer> servers) throws IOException {
+    for(RaftServer s : servers) {
+      s.start();
     }
   }
 
-  public void startServer(RaftPeerId id) {
-    startServer(getServer(id), true);
-  }
-
   /**
    * prepare the peer list when removing some peers from the conf
    */
@@ -356,18 +355,7 @@ public abstract class MiniRaftCluster {
     } else {
       b.append("ALL groups");
     }
-    getServers().stream().filter(
-        s -> {
-          if (groupId == null) {
-            return true;
-          }
-          try {
-            return groupId.equals(s.getImpl().getGroupId());
-          } catch (IOException e) {
-            return false;
-          }
-        })
-        .forEach(s -> b.append("\n  ").append(s));
+    getRaftServerProxyStream(groupId).forEach(s -> b.append("\n  ").append(s));
     return b.toString();
   }
 
@@ -407,11 +395,7 @@ public abstract class MiniRaftCluster {
   }
 
   public RaftServerImpl getLeader(RaftGroupId groupId) {
-    Stream<RaftServerImpl> stream = getServerAliveStream();
-    if (groupId != null) {
-      stream = stream.filter(s -> groupId.equals(s.getGroupId()));
-    }
-    return getLeader(stream);
+    return getLeader(getServerAliveStream(groupId));
   }
 
   static RaftServerImpl getLeader(Stream<RaftServerImpl> serverAliveStream) {
@@ -439,7 +423,7 @@ public abstract class MiniRaftCluster {
     return leaders.get(0);
   }
 
-  boolean isLeader(String leaderId) throws InterruptedException {
+  boolean isLeader(String leaderId) {
     final RaftServerImpl leader = getLeader();
     return leader != null && leader.getId().toString().equals(leaderId);
   }
@@ -454,24 +438,41 @@ public abstract class MiniRaftCluster {
     return servers.values();
   }
 
-  public Iterable<RaftServerImpl> iterateServerImpls() {
-    return CollectionUtils.as(getServers(), RaftTestUtil::getImplAsUnchecked);
+  private Stream<RaftServerProxy> getRaftServerProxyStream(RaftGroupId groupId) {
+    return getServers().stream()
+        .filter(s -> groupId == null || s.containsGroup(groupId));
   }
 
-  public static Stream<RaftServerImpl> getServerStream(Collection<RaftServerProxy> servers) {
-    return servers.stream().map(RaftTestUtil::getImplAsUnchecked);
+  public Iterable<RaftServerImpl> iterateServerImpls() {
+    return CollectionUtils.as(getServers(), this::getRaftServerImpl);
   }
-  public Stream<RaftServerImpl> getServerStream() {
-    return getServerStream(getServers());
+
+  private Stream<RaftServerImpl> getServerStream(RaftGroupId groupId) {
+    final Stream<RaftServerProxy> stream = getRaftServerProxyStream(groupId);
+    return groupId != null? stream.map(s -> RaftServerTestUtil.getRaftServerImpl(s, groupId))
+        : stream.flatMap(s -> RaftServerTestUtil.getRaftServerImpls(s).stream());
   }
+
   public Stream<RaftServerImpl> getServerAliveStream() {
-    return getServerStream(getServers()).filter(RaftServerImpl::isAlive);
+    return getServerAliveStream(getGroupId());
+  }
+
+  private Stream<RaftServerImpl> getServerAliveStream(RaftGroupId groupId) {
+    return getServerStream(groupId).filter(RaftServerImpl::isAlive);
   }
 
   public RaftServerProxy getServer(RaftPeerId id) {
     return servers.get(id);
   }
 
+  public RaftServerImpl getRaftServerImpl(RaftPeerId id) {
+    return getRaftServerImpl(servers.get(id));
+  }
+
+  private RaftServerImpl getRaftServerImpl(RaftServerProxy proxy) {
+    return RaftServerTestUtil.getRaftServerImpl(proxy, getGroupId());
+  }
+
   public List<RaftPeer> getPeers() {
     return toRaftPeers(getServers());
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index b297a27..3d80f5e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -26,6 +26,7 @@ import org.apache.ratis.protocol.*;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.RaftServerProxy;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.shaded.com.google.protobuf.ByteString;
 import org.apache.ratis.shaded.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.shaded.proto.RaftProtos.CommitInfoProto;
@@ -90,6 +91,13 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
     }
   }
 
+  static void setBlockTransaction(boolean block, MiniRaftCluster cluster) throws InterruptedException {
+    for (RaftServerProxy server : cluster.getServers()) {
+      final RaftServerImpl impl = RaftServerTestUtil.getRaftServerImpl(server, cluster.getGroupId());
+      ((SimpleStateMachine4Testing)impl.getStateMachine()).setBlockTransaction(block);
+    }
+  }
+
   @Test
   public void testAsyncRequestSemaphore() throws Exception {
     LOG.info("Running testAsyncRequestSemaphore");
@@ -103,9 +111,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
     final RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(numMessages);
     final RaftClient client = cluster.createClient();
     //Set blockTransaction flag so that transaction blocks
-    for (RaftServerProxy server : cluster.getServers()) {
-      ((SimpleStateMachine4Testing) server.getImpl().getStateMachine()).setBlockTransaction(true);
-    }
+    setBlockTransaction(true, cluster);
 
     //Send numMessages which are blocked and do not release the client semaphore permits
     AtomicInteger blockedRequestsCount = new AtomicInteger();
@@ -133,9 +139,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
     RaftClientTestUtil.assertAsyncRequestSemaphore(client, 0, 1);
 
     //Unset the blockTransaction flag so that semaphore permits can be released
-    for (RaftServerProxy server : cluster.getServers()) {
-      ((SimpleStateMachine4Testing) server.getImpl().getStateMachine()).setBlockTransaction(false);
-    }
+    setBlockTransaction(false, cluster);
     for(int i=0; i<=numMessages; i++){
       futures[i].join();
     }
@@ -148,7 +152,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
     try {
       cluster.start();
       waitForLeader(cluster);
-      RaftBasicTests.runTestBasicAppendEntries(true, replication, killLeader, 1000, cluster, LOG);
+      RaftBasicTests.runTestBasicAppendEntries(true, replication, killLeader, 100, cluster, LOG);
     } finally {
       cluster.shutdown();
     }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 8c744ab..8c0def9 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -176,7 +176,7 @@ public abstract class RaftBasicTests extends BaseTest {
     LOG.info(cluster.printAllLogs());
 
     for(RaftServerProxy server : cluster.getServers()) {
-      final RaftServerImpl impl = server.getImpl();
+      final RaftServerImpl impl = RaftServerTestUtil.getRaftServerImpl(server, cluster.getGroupId());
       if (impl.isAlive() || replication == ReplicationLevel.ALL) {
         JavaUtils.attempt(() -> RaftTestUtil.assertLogEntries(impl, term, messages),
             5, 1000, impl.getId() + " assertLogEntries", LOG);
@@ -420,7 +420,7 @@ public abstract class RaftBasicTests extends BaseTest {
       if (c.exceptionInClientThread.get() != null) {
         throw new AssertionError(c.exceptionInClientThread.get());
       }
-      RaftTestUtil.assertLogEntries(cluster.getServers(), c.messages);
+      RaftTestUtil.assertLogEntries(cluster, c.messages);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 53a051a..8667f50 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -52,13 +52,8 @@ import java.util.function.Predicate;
 
 
 public interface RaftTestUtil {
-
   Logger LOG = LoggerFactory.getLogger(RaftTestUtil.class);
 
-  static RaftServerImpl getImplAsUnchecked(RaftServerProxy proxy) {
-    return JavaUtils.callAsUnchecked(proxy::getImpl);
-  }
-
   static RaftServerImpl waitForLeader(MiniRaftCluster cluster)
       throws InterruptedException {
     return waitForLeader(cluster, false);
@@ -156,11 +151,9 @@ public interface RaftTestUtil {
     }
   }
 
-  static void assertLogEntries(Collection<RaftServerProxy> servers,
-      SimpleMessage... expectedMessages) {
-    final int size = servers.size();
-    final long count = MiniRaftCluster.getServerStream(servers)
-        .filter(RaftServerImpl::isAlive)
+  static void assertLogEntries(MiniRaftCluster cluster, SimpleMessage... expectedMessages) {
+    final int size = cluster.getServers().size();
+    final long count = cluster.getServerAliveStream()
         .map(s -> s.getState().getLog())
         .filter(log -> logEntriesContains(log, expectedMessages))
         .count();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java
index 67156a1..04a2a8d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java
+++ b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java
@@ -33,6 +33,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -60,7 +61,7 @@ public class TestRaftServerLeaderElectionTimeout extends BaseTest {
   }
 
   @Before
-  public void setup() {
+  public void setup() throws IOException {
     Assert.assertNull(cluster.getLeader());
     cluster.start();
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
index 811e7de..51af8af 100644
--- a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
+++ b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
@@ -64,7 +64,7 @@ public class TestRaftServerSlownessDetection extends BaseTest {
   }
 
   @Before
-  public void setup() {
+  public void setup() throws IOException {
     Assert.assertNull(cluster.getLeader());
     cluster.start();
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index 4886bcc..54fbf5f 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -101,7 +101,7 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
     }
 
     final RaftServerImpl leader = waitForLeader(cluster);
-    final TimeDuration sleepTime = TimeDuration.valueOf(5, TimeUnit.SECONDS);
+    final TimeDuration sleepTime = TimeDuration.valueOf(3, TimeUnit.SECONDS);
     LOG.info("sleep " + sleepTime);
     sleepTime.sleep();
 
@@ -109,8 +109,9 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
     final RaftServerProxy lastServer = i.next();
     lastServer.start();
     final RaftPeerId lastServerLeaderId = JavaUtils.attempt(
-        () -> getLeader(lastServer.getImpl().getState()),
+        () -> getLeader(lastServer.getImpls().iterator().next().getState()),
         10, 1000, "getLeaderId", LOG);
+    LOG.info(cluster.printServers());
     Assert.assertEquals(leader.getId(), lastServerLeaderId);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index 8a340d7..79017d4 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -321,8 +321,7 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest {
       final RaftLog leaderLog = cluster.getLeader().getState().getLog();
       for (RaftPeer newPeer : c1.newPeers) {
         Assert.assertArrayEquals(leaderLog.getEntries(0, Long.MAX_VALUE),
-            cluster.getServer(newPeer.getId())
-                .getImpl().getState().getLog()
+            cluster.getRaftServerImpl(newPeer.getId()).getState().getLog()
                 .getEntries(0, Long.MAX_VALUE));
       }
     } finally {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index a72e6f5..a4ec715 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -19,6 +19,7 @@ package org.apache.ratis.server.impl;
 
 import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.util.JavaUtils;
 import org.junit.Assert;
@@ -26,6 +27,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.stream.Stream;
 
 public class RaftServerTestUtil {
@@ -87,4 +89,12 @@ public class RaftServerTestUtil {
   public static Logger getStateMachineUpdaterLog() {
     return StateMachineUpdater.LOG;
   }
+
+  public static List<RaftServerImpl> getRaftServerImpls(RaftServerProxy proxy) {
+    return JavaUtils.callAsUnchecked(proxy::getImpls);
+  }
+
+  public static RaftServerImpl getRaftServerImpl(RaftServerProxy proxy, RaftGroupId groupId) {
+    return JavaUtils.callAsUnchecked(() -> proxy.getImpl(groupId));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java
index 251a4d3..f6e417c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java
@@ -48,6 +48,7 @@ public abstract class ReinitializationBaseTest extends BaseTest {
   static final Logger LOG = LoggerFactory.getLogger(ReinitializationBaseTest.class);
 
   {
+    LogUtils.setLogLevel(RaftServerProxy.LOG, Level.DEBUG);
     LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
     LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
   }
@@ -66,7 +67,7 @@ public abstract class ReinitializationBaseTest extends BaseTest {
     LOG.info("Start testReinitialize" + cluster.printServers());
 
     // Start server with an empty conf
-    final RaftGroupId groupId = RaftGroupId.randomId();
+    final RaftGroupId groupId = cluster.getGroupId();
     final RaftGroup group = new RaftGroup(groupId);
 
     final List<RaftPeerId> ids = Arrays.stream(MiniRaftCluster.generateIds(3, 0))
@@ -75,10 +76,10 @@ public abstract class ReinitializationBaseTest extends BaseTest {
     LOG.info("putNewServer: " + cluster.printServers());
 
     cluster.start();
-    LOG.info("start: " + cluster.printServers());
 
     // Make sure that there are no leaders.
     TimeUnit.SECONDS.sleep(1);
+    LOG.info("start: " + cluster.printServers());
     Assert.assertNull(cluster.getLeader());
 
     // Reinitialize servers
@@ -128,18 +129,20 @@ public abstract class ReinitializationBaseTest extends BaseTest {
     LOG.info("\n\nrunTestReinitializeMultiGroups with " + type + ": " + cluster.printServers());
 
     // Start server with an empty conf
-    final RaftGroup emptyGroup = new RaftGroup(RaftGroupId.randomId());
+    final RaftGroup emptyGroup = new RaftGroup(cluster.getGroupId());
 
     final List<RaftPeerId> ids = Arrays.stream(MiniRaftCluster.generateIds(idIndex[idIndex.length - 1], 0))
         .map(RaftPeerId::valueOf).collect(Collectors.toList());
+    LOG.info("ids: " + ids);
     ids.forEach(id -> cluster.putNewServer(id, emptyGroup, true));
     LOG.info("putNewServer: " + cluster.printServers());
 
+    TimeUnit.SECONDS.sleep(1);
     cluster.start();
-    LOG.info("start: " + cluster.printServers());
 
     // Make sure that there are no leaders.
     TimeUnit.SECONDS.sleep(1);
+    LOG.info("start: " + cluster.printServers());
     Assert.assertNull(cluster.getLeader());
 
     // Reinitialize servers to three groups

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
index 1be6c3a..c4a9a5e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
@@ -17,7 +17,6 @@
  */
 package org.apache.ratis.server.simulation;
 
-import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerRpc;
@@ -25,6 +24,8 @@ import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.shaded.proto.RaftProtos.*;
 import org.apache.ratis.util.Daemon;
 import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.LifeCycle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -125,7 +126,7 @@ class SimulatedServerRpc implements RaftServerRpc {
       = new RequestHandler.HandlerInterface<RaftServerRequest, RaftServerReply>() {
     @Override
     public boolean isAlive() {
-      return RaftTestUtil.getImplAsUnchecked(server).isAlive();
+      return !server.getLifeCycleState().isOneOf(LifeCycle.State.CLOSING, LifeCycle.State.CLOSED);
     }
 
     @Override
@@ -147,7 +148,7 @@ class SimulatedServerRpc implements RaftServerRpc {
       = new RequestHandler.HandlerInterface<RaftClientRequest, RaftClientReply>() {
     @Override
     public boolean isAlive() {
-      return RaftTestUtil.getImplAsUnchecked(server).isAlive();
+      return !server.getLifeCycleState().isOneOf(LifeCycle.State.CLOSING, LifeCycle.State.CLOSED);
     }
 
     @Override
@@ -168,7 +169,8 @@ class SimulatedServerRpc implements RaftServerRpc {
 
       future.whenCompleteAsync((reply, exception) -> {
         try {
-          final IOException e = IOUtils.asIOException(exception);
+          final IOException e = exception == null? null
+              : IOUtils.asIOException(JavaUtils.unwrapCompletionException(exception));
           clientHandler.getRpc().sendReply(request, reply, e);
         } catch (IOException e) {
           LOG.warn("Failed to send reply {} for request {} due to exception {}",

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
index ef018e5..cf0f611 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
@@ -43,6 +43,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.List;
 
 public abstract class RaftSnapshotBaseTest extends BaseTest {
@@ -82,7 +83,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
   public abstract MiniRaftCluster.Factory<?> getFactory();
 
   @Before
-  public void setup() {
+  public void setup() throws IOException {
     final RaftProperties prop = new RaftProperties();
     prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
         SimpleStateMachine4Testing.class, StateMachine.class);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
index a045ecd..07039b1 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
@@ -27,7 +27,6 @@ import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
 import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
 import org.apache.ratis.statemachine.impl.TransactionContextImpl;
@@ -68,31 +67,19 @@ public class TestStateMachine extends BaseTest {
 
   @Before
   public void setup() throws IOException {
-  }
+    properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SMTransactionContext.class, StateMachine.class);
 
-  private void startCluster() {
-    cluster = MiniRaftClusterWithSimulatedRpc.FACTORY.newCluster(
-        NUM_SERVERS, properties);
-    Assert.assertNull(getCluster().getLeader());
-    getCluster().start();
+    cluster = MiniRaftClusterWithSimulatedRpc.FACTORY.newCluster(NUM_SERVERS, properties);
+    cluster.start();
   }
 
   @After
   public void tearDown() {
-    final MiniRaftCluster cluster = getCluster();
     if (cluster != null) {
       cluster.shutdown();
     }
   }
 
-  public MiniRaftClusterWithSimulatedRpc getCluster() {
-    return cluster;
-  }
-
-  public RaftProperties getProperties() {
-    return properties;
-  }
-
   static class SMTransactionContext extends SimpleStateMachine4Testing {
     public static SMTransactionContext get(RaftServerImpl s) {
       return (SMTransactionContext)s.getStateMachine();
@@ -149,11 +136,6 @@ public class TestStateMachine extends BaseTest {
   @Test
   public void testTransactionContextIsPassedBack() throws Throwable {
     // tests that the TrxContext set by the StateMachine in Leader is passed back to the SM
-    properties.setClass(
-        MiniRaftCluster.STATEMACHINE_CLASS_KEY,
-        SMTransactionContext.class, StateMachine.class);
-    startCluster();
-
     int numTrx = 100;
     final RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(numTrx);
     try(final RaftClient client = cluster.createClient()) {
@@ -165,8 +147,8 @@ public class TestStateMachine extends BaseTest {
     // TODO: there eshould be a better way to ensure all data is replicated and applied
     Thread.sleep(cluster.getMaxTimeout() + 100);
 
-    for (RaftServerProxy raftServer : cluster.getServers()) {
-      final SMTransactionContext sm = SMTransactionContext.get(raftServer.getImpl());
+    for (RaftServerImpl raftServer : cluster.iterateServerImpls()) {
+      final SMTransactionContext sm = SMTransactionContext.get(raftServer);
       sm.rethrowIfException();
       assertEquals(numTrx, sm.numApplied.get());
     }