You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2017/03/04 01:30:44 UTC

incubator-ratis git commit: RATIS-30. Provide a way to pass parameters to rpc implementations.

Repository: incubator-ratis
Updated Branches:
  refs/heads/master 5a94eac05 -> 46116d412


RATIS-30. Provide a way to pass parameters to rpc implementations.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/46116d41
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/46116d41
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/46116d41

Branch: refs/heads/master
Commit: 46116d412add72d761ebd6a9325362dc19227459
Parents: 5a94eac
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Fri Mar 3 17:30:18 2017 -0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Fri Mar 3 17:30:18 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/ratis/RaftConfigKeys.java   |   3 +-
 .../java/org/apache/ratis/conf/Parameters.java  |  66 +++++++++
 .../main/java/org/apache/ratis/rpc/RpcType.java |   5 +-
 .../org/apache/ratis/rpc/SupportedRpcType.java  |   9 +-
 .../java/org/apache/ratis/util/RaftUtils.java   |  44 +++---
 .../ratis/examples/RaftExamplesTestUtil.java    |   2 +-
 .../java/org/apache/ratis/grpc/GrpcFactory.java |   3 +
 .../ratis/grpc/MiniRaftClusterWithGRpc.java     |  51 +++----
 .../ratis/hadooprpc/HadoopConfigKeys.java       |  68 ++++++++++
 .../apache/ratis/hadooprpc/HadoopFactory.java   |  25 +++-
 .../server/HadoopRpcServerConfigKeys.java       |  56 --------
 .../hadooprpc/server/HadoopRpcService.java      |   5 +-
 .../hadooprpc/MiniRaftClusterWithHadoopRpc.java |  47 +++----
 .../org/apache/ratis/netty/NettyFactory.java    |   3 +
 .../ratis/netty/MiniRaftClusterWithNetty.java   |  24 ++--
 .../org/apache/ratis/server/RaftServer.java     |  12 +-
 .../ratis/server/impl/RaftServerImpl.java       |  12 +-
 .../ratis/server/impl/ServerImplUtils.java      |  13 +-
 .../java/org/apache/ratis/MiniRaftCluster.java  | 133 +++++++++----------
 .../impl/RaftReconfigurationBaseTest.java       |   2 +-
 .../MiniRaftClusterWithSimulatedRpc.java        |  95 +++++--------
 .../simulation/SimulatedRequestReply.java       |  11 +-
 .../ratis/server/simulation/SimulatedRpc.java   |  32 +++--
 23 files changed, 412 insertions(+), 309 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java b/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java
index adf393b..a8bc57d 100644
--- a/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java
+++ b/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java
@@ -43,7 +43,8 @@ public interface RaftConfigKeys {
       }
 
       // Try using it as a class name
-      return RaftUtils.newInstance(t, properties, RpcType.class);
+      return RaftUtils.newInstance(
+          RaftUtils.getClass(t, properties, RpcType.class));
     }
 
     static void setType(BiConsumer<String, String> setRpcType, RpcType type) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-common/src/main/java/org/apache/ratis/conf/Parameters.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/conf/Parameters.java b/ratis-common/src/main/java/org/apache/ratis/conf/Parameters.java
new file mode 100644
index 0000000..f562d24
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/conf/Parameters.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.conf;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A generic parameter map.
+ * The difference between this class and {@link RaftProperties} is that
+ * {@link RaftProperties} is {@link String} based, i.e. properties are strings,
+ * while this class is {@link Object} based, i.e. parameters can be any objects.
+ *
+ * Null keys or null values are not supported.
+ *
+ * This class is thread safe.
+ */
+public class Parameters {
+  private final Map<String, Object> map = new ConcurrentHashMap<>();
+
+  /** Put the key-value pair to the map. */
+  public <T> T put(String key, T value, Class<T> valueClass) {
+    return valueClass.cast(map.put(
+        Objects.requireNonNull(key, "key is null"),
+        Objects.requireNonNull(value, () -> "value is null, key=" + key)));
+  }
+
+  /**
+   * @param <T> The value type.
+   * @return The value mapped to the given key;
+   *         or return null if the key does not map to any value.
+   * @throws IllegalArgumentException if the mapped value is not an instance of the given class.
+   */
+  public <T> T get(String key, Class<T> valueClass) {
+    final Object value = map.get(Objects.requireNonNull(key, "key is null"));
+    return valueClass.cast(value);
+  }
+
+  /**
+   * The same as {@link #get(String, Class)} except that this method throws
+   * a {@link NullPointerException} if the key does not map to any value.
+   */
+  public <T> T getNonNull(String key, Class<T> valueClass) {
+    final T value = get(key, valueClass);
+    if (value != null) {
+      return value;
+    }
+    throw new NullPointerException("The key " + key + " does not map to any value.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-common/src/main/java/org/apache/ratis/rpc/RpcType.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/rpc/RpcType.java b/ratis-common/src/main/java/org/apache/ratis/rpc/RpcType.java
index a8085fe..701e979 100644
--- a/ratis-common/src/main/java/org/apache/ratis/rpc/RpcType.java
+++ b/ratis-common/src/main/java/org/apache/ratis/rpc/RpcType.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.rpc;
 
+import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 
 /** The type of RPC implementations. */
@@ -24,8 +25,8 @@ public interface RpcType {
   /** @return the name of the rpc type. */
   String name();
 
-  /** @return a new factory created using the given properties. */
-  RpcFactory newFactory(RaftProperties properties);
+  /** @return a new factory created using the given properties and parameters. */
+  RpcFactory newFactory(RaftProperties properties, Parameters parameters);
 
   /** An interface to get {@link RpcType}. */
   interface Get {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-common/src/main/java/org/apache/ratis/rpc/SupportedRpcType.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/rpc/SupportedRpcType.java b/ratis-common/src/main/java/org/apache/ratis/rpc/SupportedRpcType.java
index dcba59b..f1d8fac 100644
--- a/ratis-common/src/main/java/org/apache/ratis/rpc/SupportedRpcType.java
+++ b/ratis-common/src/main/java/org/apache/ratis/rpc/SupportedRpcType.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.rpc;
 
+import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.util.RaftUtils;
 
@@ -31,6 +32,8 @@ public enum SupportedRpcType implements RpcType {
     return valueOf(s.toUpperCase());
   }
 
+  private static final Class<?>[] ARG_CLASSES = {Parameters.class};
+
   private final String factoryClassName;
 
   SupportedRpcType(String factoryClassName) {
@@ -38,7 +41,9 @@ public enum SupportedRpcType implements RpcType {
   }
 
   @Override
-  public RpcFactory newFactory(RaftProperties properties) {
-    return RaftUtils.newInstance(factoryClassName, properties, RpcFactory.class);
+  public RpcFactory newFactory(RaftProperties properties, Parameters parameters) {
+    final Class<? extends RpcFactory> clazz = RaftUtils.getClass(
+        factoryClassName, properties, RpcFactory.class);
+    return RaftUtils.newInstance(clazz, ARG_CLASSES, parameters);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java
index 7f5703d..02b227c 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java
@@ -28,9 +28,7 @@ import java.io.*;
 import java.lang.reflect.Constructor;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Objects;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
@@ -85,12 +83,13 @@ public abstract class RaftUtils {
   public static final boolean PPC_64
       = System.getProperties().getProperty("os.arch").contains("ppc64");
 
+  public static final Class<?>[] EMPTY_CLASSES = {};
   /**
    * Cache of constructors for each class. Pins the classes so they
    * can't be garbage collected until ReflectionUtils can be collected.
    */
-  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
-      new ConcurrentHashMap<>();
+  private static final Map<List<Class<?>>, Constructor<?>> CONSTRUCTOR_CACHE
+      = new ConcurrentHashMap<>();
 
   public static InterruptedIOException toInterruptedIOException(
       String message, InterruptedException e) {
@@ -120,31 +119,40 @@ public abstract class RaftUtils {
 
   /**
    * Create an object for the given class using its default constructor.
+   */
+  public static <T> T newInstance(Class<T> clazz) {
+    return newInstance(clazz, EMPTY_CLASSES);
+  }
+
+  /**
+   * Create an object for the given class using the specified constructor.
    *
    * @param clazz class of which an object is created
+   * @param argClasses argument classes of the constructor
+   * @param args actual arguments to be passed to the constructor
+   * @param <T> class type of clazz
    * @return a new object
    */
-  public static <T> T newInstance(Class<T> clazz) {
+  public static <T> T newInstance(Class<T> clazz, Class<?>[] argClasses, Object... args) {
     Objects.requireNonNull(clazz, "clazz == null");
     try {
+      final List<Class<?>> key = new ArrayList<>();
+      key.add(clazz);
+      key.addAll(Arrays.asList(argClasses));
+
       @SuppressWarnings("unchecked")
-      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz);
-      if (meth == null) {
-        meth = clazz.getDeclaredConstructor();
-        meth.setAccessible(true);
-        CONSTRUCTOR_CACHE.put(clazz, meth);
+      Constructor<T> ctor = (Constructor<T>) CONSTRUCTOR_CACHE.get(key);
+      if (ctor == null) {
+        ctor = clazz.getDeclaredConstructor(argClasses);
+        ctor.setAccessible(true);
+        CONSTRUCTOR_CACHE.put(key, ctor);
       }
-      return meth.newInstance();
+      return ctor.newInstance(args);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
 
-  public static <BASE, SUB extends BASE> SUB newInstance(
-      String subClassName, RaftProperties properties, Class<BASE> base) {
-    return newInstance(getClass(subClassName, properties, base));
-  }
-
   public static <BASE, SUB extends BASE> Class<SUB> getClass(
       String subClassName, RaftProperties properties, Class<BASE> base) {
     try {
@@ -309,8 +317,8 @@ public abstract class RaftUtils {
 
   public static <INPUT, OUTPUT> Iterable<OUTPUT> as(
       Iterable<INPUT> iteration, Function<INPUT, OUTPUT> converter) {
-    final Iterator<INPUT> i = iteration.iterator();
     return () -> new Iterator<OUTPUT>() {
+      final Iterator<INPUT> i = iteration.iterator();
       @Override
       public boolean hasNext() {
         return i.hasNext();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java b/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java
index f54b766..7804353 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java
@@ -37,7 +37,7 @@ public class RaftExamplesTestUtil {
       Collection<Object[]> clusters, MiniRaftCluster.Factory factory,
       String[] ids, RaftProperties properties)
       throws IOException {
-    clusters.add(new Object[]{factory.newCluster(ids, properties, true)});
+    clusters.add(new Object[]{factory.newCluster(ids, properties)});
   }
 
   public static Collection<Object[]> getMiniRaftClusters(

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
index 3ae2602..60d6ef6 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
@@ -18,12 +18,15 @@
 package org.apache.ratis.grpc;
 
 import org.apache.ratis.client.ClientFactory;
+import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.grpc.client.GrpcClientRpc;
 import org.apache.ratis.grpc.server.GRpcLogAppender;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.impl.*;
 
 public class GrpcFactory implements ServerFactory, ClientFactory {
+  public GrpcFactory(Parameters parameters) {}
+
   @Override
   public SupportedRpcType getRpcType() {
     return SupportedRpcType.GRPC;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
index 4c7d74d..9676a48 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
@@ -21,61 +21,48 @@ import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
-import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
-import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.*;
+import org.apache.ratis.statemachine.StateMachine;
 
-import java.util.Collection;
+import java.io.IOException;
 
 public class MiniRaftClusterWithGRpc extends MiniRaftCluster.RpcBase {
   public static final Factory<MiniRaftClusterWithGRpc> FACTORY
       = new Factory<MiniRaftClusterWithGRpc>() {
     @Override
     public MiniRaftClusterWithGRpc newCluster(
-        String[] ids, RaftProperties prop, boolean formatted) {
+        String[] ids, RaftProperties prop) {
       RaftConfigKeys.Rpc.setType(prop::set, SupportedRpcType.GRPC);
-      return new MiniRaftClusterWithGRpc(ids, prop, formatted);
+      return new MiniRaftClusterWithGRpc(ids, prop);
     }
   };
 
   public static final DelayLocalExecutionInjection sendServerRequestInjection =
       new DelayLocalExecutionInjection(RaftGRpcService.GRPC_SEND_SERVER_REQUEST);
 
-  private MiniRaftClusterWithGRpc(String[] ids, RaftProperties properties,
-      boolean formatted) {
-    super(ids, properties, formatted);
+  private MiniRaftClusterWithGRpc(String[] ids, RaftProperties properties) {
+    super(ids, properties, null);
   }
 
   @Override
-  protected RaftServerImpl newRaftServer(RaftPeerId id, boolean format) {
-    final RaftServerImpl s = super.newRaftServer(id, format);
-    s.getProperties().setInt(
-        RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY, getPort(s));
-    return s;
+  protected RaftServerImpl newRaftServer(
+      RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf,
+      RaftProperties properties) throws IOException {
+    properties.setInt(RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY, getPort(id, conf));
+    return ServerImplUtils.newRaftServer(id, stateMachine, conf, properties, null);
   }
 
   @Override
-  protected Collection<RaftPeer> addNewPeers(
-      Collection<RaftServerImpl> newServers, boolean startService) {
-    final Collection<RaftPeer> peers = toRaftPeers(newServers);
-    for (RaftPeer p: peers) {
-      final RaftServerImpl server = servers.get(p.getId());
-      if (!startService) {
-        BlockRequestHandlingInjection.getInstance().blockReplier(server.getId().toString());
-      } else {
-        server.start();
-      }
+  protected void startServer(RaftServerImpl server, boolean startService) {
+    final String id = server.getId().toString();
+    if (startService) {
+      server.start();
+      BlockRequestHandlingInjection.getInstance().unblockReplier(id);
+    } else {
+      BlockRequestHandlingInjection.getInstance().blockReplier(id);
     }
-    return peers;
-  }
-
-  @Override
-  public void startServer(RaftPeerId id) {
-    super.startServer(id);
-    BlockRequestHandlingInjection.getInstance().unblockReplier(id.toString());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java
new file mode 100644
index 0000000..cc09510
--- /dev/null
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ratis.conf.ConfUtils;
+import org.apache.ratis.conf.Parameters;
+
+import java.net.InetSocketAddress;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+
+/** Hadoop Rpc specific configuration properties. */
+public interface HadoopConfigKeys {
+  String PREFIX = "raft.hadooprpc";
+
+  String CONF_KEY = PREFIX + ".conf";
+
+  static Configuration getConf(
+      BiFunction<String, Class<Configuration>, Configuration> getConf) {
+    return getConf.apply(CONF_KEY, Configuration.class);
+  }
+
+  static void setConf(Parameters parameters, Configuration conf) {
+    parameters.put(CONF_KEY, conf, Configuration.class);
+  }
+
+  /** IPC server configurations */
+  interface Ipc {
+    String PREFIX = HadoopConfigKeys.PREFIX + ".ipc";
+
+    String ADDRESS_KEY = PREFIX + ".address";
+    int DEFAULT_PORT = 10718;
+    String ADDRESS_DEFAULT = "0.0.0.0:" + DEFAULT_PORT;
+
+    String HANDLERS_KEY = PREFIX + ".handlers";
+    int HANDLERS_DEFAULT = 10;
+
+    static int handlers(BiFunction<String, Integer, Integer> getInt) {
+      return ConfUtils.getInt(getInt,
+          HANDLERS_KEY, HANDLERS_DEFAULT, 1, null);
+    }
+
+    static InetSocketAddress address(BiFunction<String, String, String> getTrimmed) {
+      return ConfUtils.getInetSocketAddress(getTrimmed,
+          ADDRESS_KEY, ADDRESS_DEFAULT);
+    }
+
+    static void setAddress(BiConsumer<String, String> setString, String address) {
+      ConfUtils.set(setString, ADDRESS_KEY, address);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
index 7b9e20f..ad866c8 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
@@ -19,6 +19,7 @@ package org.apache.ratis.hadooprpc;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.ratis.client.ClientFactory;
+import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.hadooprpc.client.HadoopClientRpc;
 import org.apache.ratis.hadooprpc.server.HadoopRpcService;
 import org.apache.ratis.rpc.SupportedRpcType;
@@ -26,10 +27,24 @@ import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.ServerFactory;
 
 public class HadoopFactory extends ServerFactory.BaseFactory implements ClientFactory {
-  private Configuration conf;
+  public static Parameters newRaftParameters(Configuration conf) {
+    final Parameters p = new Parameters();
+    HadoopConfigKeys.setConf(p, conf);
+    return p;
+  }
+
+  private final Configuration conf;
+
+  public HadoopFactory(Parameters parameters) {
+    this(HadoopConfigKeys.getConf(parameters::get));
+  }
+
+  public HadoopFactory(Configuration conf) {
+    this.conf = conf != null? conf: new Configuration();
+  }
 
-  public void setConf(Configuration conf) {
-    this.conf = conf;
+  public Configuration getConf() {
+    return conf;
   }
 
   @Override
@@ -41,12 +56,12 @@ public class HadoopFactory extends ServerFactory.BaseFactory implements ClientFa
   public HadoopRpcService newRaftServerRpc(RaftServerImpl server) {
     return HadoopRpcService.newBuilder()
         .setServer(server)
-        .setConf(conf)
+        .setConf(getConf())
         .build();
   }
 
   @Override
   public HadoopClientRpc newRaftClientRpc() {
-    return new HadoopClientRpc(conf);
+    return new HadoopClientRpc(getConf());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcServerConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcServerConfigKeys.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcServerConfigKeys.java
deleted file mode 100644
index bfdf05b..0000000
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcServerConfigKeys.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.hadooprpc.server;
-
-import org.apache.ratis.conf.ConfUtils;
-
-import java.net.InetSocketAddress;
-import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
-
-public interface HadoopRpcServerConfigKeys {
-  String PREFIX = "raft.hadooprpc";
-
-  /** IPC server configurations */
-  abstract class Ipc {
-    public static final String PREFIX = HadoopRpcServerConfigKeys.PREFIX + ".ipc";
-
-    public static final String ADDRESS_KEY = PREFIX + ".address";
-    public static final int DEFAULT_PORT = 10718;
-    public static final String ADDRESS_DEFAULT = "0.0.0.0:" + DEFAULT_PORT;
-
-    public static final String HANDLERS_KEY = PREFIX + ".handlers";
-    public static final int HANDLERS_DEFAULT = 10;
-
-    public static int handlers(BiFunction<String, Integer, Integer> getInt) {
-      return ConfUtils.getInt(getInt,
-          HANDLERS_KEY, HANDLERS_DEFAULT, 1, null);
-    }
-
-    public static InetSocketAddress address(BiFunction<String, String, String> getTrimmed) {
-      return ConfUtils.getInetSocketAddress(getTrimmed,
-          ADDRESS_KEY, ADDRESS_DEFAULT);
-    }
-
-    public static void setAddress(
-        BiConsumer<String, String> setString,
-        String address) {
-      ConfUtils.set(setString, ADDRESS_KEY, address);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
index 87fd2e2..00d69aa 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
@@ -20,6 +20,7 @@ package org.apache.ratis.hadooprpc.server;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.ProtobufRpcEngineShaded;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.ratis.hadooprpc.HadoopConfigKeys;
 import org.apache.ratis.hadooprpc.Proxy;
 import org.apache.ratis.hadooprpc.client.RaftClientProtocolPB;
 import org.apache.ratis.hadooprpc.client.RaftClientProtocolServerSideTranslatorPB;
@@ -118,8 +119,8 @@ public class HadoopRpcService implements RaftServerRpc {
   private static RPC.Server newRpcServer(
       RaftServerProtocol serverProtocol, final Configuration conf)
       throws IOException {
-    final int handlerCount = HadoopRpcServerConfigKeys.Ipc.handlers(conf::getInt);
-    final InetSocketAddress address = HadoopRpcServerConfigKeys.Ipc.address(conf::getTrimmed);
+    final int handlerCount = HadoopConfigKeys.Ipc.handlers(conf::getInt);
+    final InetSocketAddress address = HadoopConfigKeys.Ipc.address(conf::getTrimmed);
 
     final BlockingService service
         = RaftServerProtocolService.newReflectiveBlockingService(

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
index 76acafd..b3a607b 100644
--- a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
+++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
@@ -22,36 +22,39 @@ import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.hadooprpc.server.HadoopRpcServerConfigKeys;
 import org.apache.ratis.hadooprpc.server.HadoopRpcService;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
+import org.apache.ratis.server.impl.RaftConfiguration;
 import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.ServerImplUtils;
+import org.apache.ratis.statemachine.StateMachine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+
 public class MiniRaftClusterWithHadoopRpc extends MiniRaftCluster.RpcBase {
   static final Logger LOG = LoggerFactory.getLogger(MiniRaftClusterWithHadoopRpc.class);
 
   public static class Factory extends MiniRaftCluster.Factory<MiniRaftClusterWithHadoopRpc> {
     @Override
-    public MiniRaftClusterWithHadoopRpc newCluster(
-        String[] ids, RaftProperties prop, boolean formatted) {
+    public MiniRaftClusterWithHadoopRpc newCluster(String[] ids, RaftProperties prop) {
       final Configuration conf = new Configuration();
-      return newCluster(ids, prop, conf, formatted);
+      return newCluster(ids, prop, conf);
     }
 
     public MiniRaftClusterWithHadoopRpc newCluster(
         int numServers, RaftProperties properties, Configuration conf) {
-      return newCluster(generateIds(numServers, 0), properties, conf, true);
+      return newCluster(generateIds(numServers, 0), properties, conf);
     }
 
     public MiniRaftClusterWithHadoopRpc newCluster(
-        String[] ids, RaftProperties prop, Configuration conf, boolean formatted) {
+        String[] ids, RaftProperties prop, Configuration conf) {
       RaftConfigKeys.Rpc.setType(prop::set, SupportedRpcType.HADOOP);
-      HadoopRpcServerConfigKeys.Ipc.setAddress(conf::set, "0.0.0.0:0");
-      return new MiniRaftClusterWithHadoopRpc(ids, prop, conf, formatted);
+      HadoopConfigKeys.Ipc.setAddress(conf::set, "0.0.0.0:0");
+      return new MiniRaftClusterWithHadoopRpc(ids, prop, conf);
     }
   }
 
@@ -63,27 +66,21 @@ public class MiniRaftClusterWithHadoopRpc extends MiniRaftCluster.RpcBase {
   private final Configuration hadoopConf;
 
   private MiniRaftClusterWithHadoopRpc(String[] ids, RaftProperties properties,
-      Configuration hadoopConf, boolean formatted) {
-    super(ids, properties, formatted);
+      Configuration hadoopConf) {
+    super(ids, properties, HadoopFactory.newRaftParameters(hadoopConf));
     this.hadoopConf = hadoopConf;
-    getServers().stream().forEach(s -> setConf(s));
-    ((HadoopFactory)clientFactory).setConf(hadoopConf);
-  }
-
-  private void setConf(RaftServerImpl server) {
-    final Configuration conf = new Configuration(hadoopConf);
-    final String address = "0.0.0.0:" + getPort(server);
-    HadoopRpcServerConfigKeys.Ipc.setAddress(conf::set, address);
-    ((HadoopFactory)server.getFactory()).setConf(conf);
   }
 
   @Override
-  protected RaftServerImpl newRaftServer(RaftPeerId id, boolean format) {
-    final RaftServerImpl s = super.newRaftServer(id, format);
-    if (hadoopConf != null) {
-      setConf(s);
-    }
-    return s;
+  protected RaftServerImpl newRaftServer(
+      RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf,
+      RaftProperties properties) throws IOException {
+    final Configuration hconf = new Configuration(hadoopConf);
+    final String address = "0.0.0.0:" + getPort(id, conf);
+    HadoopConfigKeys.Ipc.setAddress(hconf::set, address);
+
+    return ServerImplUtils.newRaftServer(id, stateMachine, conf, properties,
+        HadoopFactory.newRaftParameters(hconf));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
index 525b991..6dcfd15 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.netty;
 
 import org.apache.ratis.client.ClientFactory;
+import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.netty.client.NettyClientRpc;
 import org.apache.ratis.netty.server.NettyRpcService;
 import org.apache.ratis.rpc.SupportedRpcType;
@@ -25,6 +26,8 @@ import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.ServerFactory;
 
 public class NettyFactory extends ServerFactory.BaseFactory implements ClientFactory {
+  public NettyFactory(Parameters parameters) {}
+
   @Override
   public SupportedRpcType getRpcType() {
     return SupportedRpcType.NETTY;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
index 29857dd..697aef6 100644
--- a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
+++ b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
@@ -25,32 +25,36 @@ import org.apache.ratis.netty.server.NettyRpcService;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
+import org.apache.ratis.server.impl.RaftConfiguration;
 import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.ServerImplUtils;
+import org.apache.ratis.statemachine.StateMachine;
+
+import java.io.IOException;
 
 public class MiniRaftClusterWithNetty extends MiniRaftCluster.RpcBase {
   public static final Factory<MiniRaftClusterWithNetty> FACTORY
       = new Factory<MiniRaftClusterWithNetty>() {
     @Override
-    public MiniRaftClusterWithNetty newCluster(
-        String[] ids, RaftProperties prop, boolean formatted) {
+    public MiniRaftClusterWithNetty newCluster(String[] ids, RaftProperties prop) {
       RaftConfigKeys.Rpc.setType(prop::set, SupportedRpcType.NETTY);
-      return new MiniRaftClusterWithNetty(ids, prop, formatted);
+      return new MiniRaftClusterWithNetty(ids, prop);
     }
   };
 
   public static final DelayLocalExecutionInjection sendServerRequest
       = new DelayLocalExecutionInjection(NettyRpcService.SEND_SERVER_REQUEST);
 
-  private MiniRaftClusterWithNetty(
-      String[] ids, RaftProperties properties, boolean formatted) {
-    super(ids, properties, formatted);
+  private MiniRaftClusterWithNetty(String[] ids, RaftProperties properties) {
+    super(ids, properties, null);
   }
 
   @Override
-  protected RaftServerImpl newRaftServer(RaftPeerId id, boolean format) {
-    final RaftServerImpl s = super.newRaftServer(id, format);
-    NettyConfigKeys.Server.setPort(s.getProperties()::setInt, getPort(s));
-    return s;
+  protected RaftServerImpl newRaftServer(
+      RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf,
+      RaftProperties properties) throws IOException {
+    NettyConfigKeys.Server.setPort(properties::setInt, getPort(id, conf));
+    return ServerImplUtils.newRaftServer(id, stateMachine, conf, properties, null);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index 7417e0d..dbd32b7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -17,13 +17,13 @@
  */
 package org.apache.ratis.server;
 
+import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftClientAsynchronousProtocol;
 import org.apache.ratis.protocol.RaftClientProtocol;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.RpcType;
-import org.apache.ratis.server.impl.RaftConfiguration;
 import org.apache.ratis.server.impl.ServerFactory;
 import org.apache.ratis.server.impl.ServerImplUtils;
 import org.apache.ratis.server.protocol.RaftServerProtocol;
@@ -65,6 +65,7 @@ public interface RaftServer extends Closeable, RpcType.Get, RaftServerProtocol,
     private StateMachine stateMachine;
     private Iterable<RaftPeer> peers;
     private RaftProperties properties;
+    private Parameters parameters;
 
     /** @return a {@link RaftServer} object. */
     public RaftServer build() throws IOException {
@@ -72,7 +73,8 @@ public interface RaftServer extends Closeable, RpcType.Get, RaftServerProtocol,
           Objects.requireNonNull(serverId, "The 'serverId' field is not initialized."),
           Objects.requireNonNull(stateMachine, "The 'stateMachine' is not initialized."),
           Objects.requireNonNull(peers, "The 'peers' field is not initialized."),
-          Objects.requireNonNull(properties, "The 'properties' field is not initialized."));
+          Objects.requireNonNull(properties, "The 'properties' field is not initialized."),
+          parameters);
     }
 
     /** Set the server ID. */
@@ -98,5 +100,11 @@ public interface RaftServer extends Closeable, RpcType.Get, RaftServerProtocol,
       this.properties = properties;
       return this;
     }
+
+    /** Set {@link Parameters}. */
+    public Builder setParameters(Parameters parameters) {
+      this.parameters = parameters;
+      return this;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index d729914..b9f063e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server.impl;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.*;
@@ -47,7 +48,6 @@ import java.util.List;
 import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.function.Supplier;
 
 import static org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*;
 import static org.apache.ratis.util.LifeCycle.State.*;
@@ -84,12 +84,12 @@ public class RaftServerImpl implements RaftServer {
   /** used when the peer is leader */
   private volatile LeaderState leaderState;
 
-  private final Supplier<RaftServerRpc> serverRpc;
+  private final RaftServerRpc serverRpc;
 
   private final ServerFactory factory;
 
   RaftServerImpl(RaftPeerId id, StateMachine stateMachine,
-                 RaftConfiguration raftConf, RaftProperties properties)
+      RaftConfiguration raftConf, RaftProperties properties, Parameters parameters)
       throws IOException {
     this.lifeCycle = new LifeCycle(id);
     minTimeoutMs = properties.getInt(
@@ -105,8 +105,8 @@ public class RaftServerImpl implements RaftServer {
     this.state = new ServerState(id, raftConf, properties, this, stateMachine);
 
     final RpcType rpcType = RaftConfigKeys.Rpc.type(properties);
-    this.factory = ServerFactory.cast(rpcType.newFactory(properties));
-    this.serverRpc = RaftUtils.memoize(() -> initRaftServerRpc());
+    this.factory = ServerFactory.cast(rpcType.newFactory(properties, parameters));
+    this.serverRpc = initRaftServerRpc();
   }
 
   @Override
@@ -147,7 +147,7 @@ public class RaftServerImpl implements RaftServer {
   }
 
   public RaftServerRpc getServerRpc() {
-    return serverRpc.get();
+    return serverRpc;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index 5248906..30d6e29 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.server.impl;
 
 import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
@@ -30,17 +31,17 @@ import java.io.IOException;
 /** Server utilities for internal use. */
 public class ServerImplUtils {
   public static RaftServer newRaftServer(
-      RaftPeerId id, StateMachine stateMachine,
-      Iterable<RaftPeer> peers, RaftProperties properties) throws IOException {
+      RaftPeerId id, StateMachine stateMachine, Iterable<RaftPeer> peers,
+      RaftProperties properties, Parameters parameters) throws IOException {
     return newRaftServer(id, stateMachine,
         RaftConfiguration.newBuilder().setConf(peers).build(),
-        properties);
+        properties, parameters);
   }
 
   public static RaftServerImpl newRaftServer(
-      RaftPeerId id, StateMachine stateMachine,
-      RaftConfiguration conf, RaftProperties properties) throws IOException {
-    return new RaftServerImpl(id, stateMachine, conf, properties);
+      RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf,
+      RaftProperties properties, Parameters parameters) throws IOException {
+    return new RaftServerImpl(id, stateMachine, conf, properties, parameters);
   }
 
   public static TermIndex newTermIndex(long term, long index) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 9c566a9..6d14c24 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -20,6 +20,7 @@ package org.apache.ratis;
 import com.google.common.base.Preconditions;
 import org.apache.ratis.client.ClientFactory;
 import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
@@ -42,7 +43,9 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT;
 
@@ -59,22 +62,16 @@ public abstract class MiniRaftCluster {
 
   public static abstract class Factory<CLUSTER extends MiniRaftCluster> {
     public abstract CLUSTER newCluster(
-        String[] ids, RaftProperties prop, boolean formatted);
+        String[] ids, RaftProperties prop);
 
     public CLUSTER newCluster(int numServer, RaftProperties prop) {
-      return newCluster(generateIds(numServer, 0), prop, true);
+      return newCluster(generateIds(numServer, 0), prop);
     }
   }
 
   public static abstract class RpcBase extends MiniRaftCluster {
-    public RpcBase(String[] ids, RaftProperties properties, boolean formatted) {
-      super(ids, properties, formatted);
-    }
-
-    @Override
-    public void restartServer(String id, boolean format) throws IOException {
-      super.restartServer(id, format);
-      getServer(id).start();
+    public RpcBase(String[] ids, RaftProperties properties, Parameters parameters) {
+      super(ids, properties, parameters);
     }
 
     @Override
@@ -82,12 +79,6 @@ public abstract class MiniRaftCluster {
       RaftTestUtil.setBlockRequestsFrom(src, block);
     }
 
-    public static int getPort(RaftServerImpl server) {
-      final int port = getPort(server.getId(), server.getState().getRaftConf());
-      LOG.info(server.getId() + "(" + server.getRpcType() + "), port=" + port);
-      return port;
-    }
-
     public static int getPort(RaftPeerId id, RaftConfiguration conf) {
       final RaftPeer peer = conf.getPeer(id);
       final String address = peer != null? peer.getAddress(): null;
@@ -144,27 +135,46 @@ public abstract class MiniRaftCluster {
   protected final ClientFactory clientFactory;
   protected RaftConfiguration conf;
   protected final RaftProperties properties;
+  protected final Parameters parameters;
   private final String testBaseDir;
-  protected final Map<RaftPeerId, RaftServerImpl> servers =
-      Collections.synchronizedMap(new LinkedHashMap<>());
+  protected final Map<RaftPeerId, RaftServerImpl> servers = new ConcurrentHashMap<>();
 
-  public MiniRaftCluster(String[] ids, RaftProperties properties,
-      boolean formatted) {
+  protected MiniRaftCluster(String[] ids, RaftProperties properties, Parameters parameters) {
     this.conf = initConfiguration(ids);
     this.properties = new RaftProperties(properties);
+    this.parameters = parameters;
 
     final RpcType rpcType = RaftConfigKeys.Rpc.type(properties);
-    this.clientFactory = ClientFactory.cast(rpcType.newFactory(properties));
+    this.clientFactory = ClientFactory.cast(
+        rpcType.newFactory(properties, parameters));
     this.testBaseDir = getBaseDirectory();
 
-    conf.getPeers().forEach(
-        p -> servers.put(p.getId(), newRaftServer(p.getId(), formatted)));
-
     ExitUtils.disableSystemExit();
   }
 
+  public MiniRaftCluster initServers() {
+    if (servers.isEmpty()) {
+      putNewServers(RaftUtils.as(conf.getPeers(), RaftPeer::getId), true);
+    }
+    return this;
+  }
+
+  private RaftServerImpl putNewServer(RaftPeerId id, boolean format) {
+    final RaftServerImpl s = newRaftServer(id, format);
+    Preconditions.checkState(servers.put(id, s) == null);
+    return s;
+  }
+
+  private Collection<RaftServerImpl> putNewServers(
+      Iterable<RaftPeerId> peers, boolean format) {
+    return StreamSupport.stream(peers.spliterator(), false)
+        .map(id -> putNewServer(id, format))
+        .collect(Collectors.toList());
+  }
+
   public void start() {
     LOG.info("Starting " + getClass().getSimpleName());
+    initServers();
     servers.values().forEach(RaftServerImpl::start);
   }
 
@@ -175,24 +185,19 @@ public abstract class MiniRaftCluster {
     final RaftPeerId newId = new RaftPeerId(id);
     killServer(newId);
     servers.remove(newId);
-    servers.put(newId, newRaftServer(newId, format));
+
+    startServer(putNewServer(newId, format), true);
   }
 
-  public final void restart(boolean format) throws IOException {
+  public void restart(boolean format) throws IOException {
     servers.values().stream().filter(RaftServerImpl::isAlive)
         .forEach(RaftServerImpl::close);
     List<RaftPeerId> idList = new ArrayList<>(servers.keySet());
-    for (RaftPeerId id : idList) {
-      servers.remove(id);
-      servers.put(id, newRaftServer(id, format));
-    }
-
-    initRpc();
+    servers.clear();
+    putNewServers(idList, format);
     start();
   }
 
-  protected void initRpc() {}
-
   public int getMaxTimeout() {
     return properties.getInt(
         RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_KEY,
@@ -203,7 +208,7 @@ public abstract class MiniRaftCluster {
     return conf;
   }
 
-  protected RaftServerImpl newRaftServer(RaftPeerId id, boolean format) {
+  private RaftServerImpl newRaftServer(RaftPeerId id, boolean format) {
     try {
       final String dirStr = testBaseDir + id;
       if (format) {
@@ -212,12 +217,16 @@ public abstract class MiniRaftCluster {
       final RaftProperties prop = new RaftProperties(properties);
       prop.set(RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY, dirStr);
       final StateMachine stateMachine = getStateMachine4Test(properties);
-      return ServerImplUtils.newRaftServer(id, stateMachine, conf, prop);
+      return newRaftServer(id, stateMachine, conf, prop);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
   }
 
+  protected abstract RaftServerImpl newRaftServer(
+      RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf,
+      RaftProperties properties) throws IOException;
+
   static StateMachine getStateMachine4Test(RaftProperties properties) {
     final Class<? extends StateMachine> smClass = properties.getClass(
         STATEMACHINE_CLASS_KEY,
@@ -229,17 +238,12 @@ public abstract class MiniRaftCluster {
   public static Collection<RaftPeer> toRaftPeers(
       Collection<RaftServerImpl> servers) {
     return servers.stream()
-        .map(s -> new RaftPeer(s.getId(), s.getServerRpc().getInetSocketAddress()))
+        .map(MiniRaftCluster::toRaftPeer)
         .collect(Collectors.toList());
   }
 
-  protected Collection<RaftPeer> addNewPeers(
-      Collection<RaftServerImpl> newServers, boolean startService) {
-    final Collection<RaftPeer> peers = toRaftPeers(newServers);
-    if (startService) {
-      newServers.forEach(RaftServerImpl::start);
-    }
-    return peers;
+  public static RaftPeer toRaftPeer(RaftServerImpl s) {
+    return new RaftPeer(s.getId(), s.getServerRpc().getInetSocketAddress());
   }
 
   public PeerChanges addNewPeers(int number, boolean startNewPeer)
@@ -252,18 +256,11 @@ public abstract class MiniRaftCluster {
     LOG.info("Add new peers {}", Arrays.asList(ids));
 
     // create and add new RaftServers
-    final List<RaftServerImpl> newServers = new ArrayList<>(ids.length);
-    for (String id : ids) {
-      Preconditions.checkArgument(!servers.containsKey(id));
-
-      final RaftPeerId peerId = new RaftPeerId(id);
-      final RaftServerImpl newServer = newRaftServer(peerId, true);
-      servers.put(peerId, newServer);
-      newServers.add(newServer);
-    }
-
-    final Collection<RaftPeer> newPeers = addNewPeers(newServers, startNewPeer);
+    final Collection<RaftServerImpl> newServers = putNewServers(
+        RaftUtils.as(Arrays.asList(ids), RaftPeerId::new), true);
+    newServers.forEach(s -> startServer(s, startNewPeer));
 
+    final Collection<RaftPeer> newPeers = toRaftPeers(newServers);
     final RaftPeer[] np = newPeers.toArray(new RaftPeer[newPeers.size()]);
     newPeers.addAll(conf.getPeers());
     conf = RaftConfiguration.newBuilder().setConf(newPeers).setLogEntryIndex(0).build();
@@ -271,14 +268,14 @@ public abstract class MiniRaftCluster {
     return new PeerChanges(p, np, new RaftPeer[0]);
   }
 
-  public void startServer(RaftPeerId id) {
-    RaftServerImpl server = servers.get(id);
-    assert server != null;
-    server.start();
+  protected void startServer(RaftServerImpl server, boolean startService) {
+    if (startService) {
+      server.start();
+    }
   }
 
-  private RaftPeer getPeer(RaftServerImpl s) {
-    return new RaftPeer(s.getId(), s.getServerRpc().getInetSocketAddress());
+  public void startServer(RaftPeerId id) {
+    startServer(getServer(id), true);
   }
 
   /**
@@ -289,7 +286,7 @@ public abstract class MiniRaftCluster {
     Collection<RaftPeer> peers = new ArrayList<>(conf.getPeers());
     List<RaftPeer> removedPeers = new ArrayList<>(number);
     if (removeLeader) {
-      final RaftPeer leader = getPeer(getLeader());
+      final RaftPeer leader = toRaftPeer(getLeader());
       assert !excluded.contains(leader);
       peers.remove(leader);
       removedPeers.add(leader);
@@ -297,7 +294,7 @@ public abstract class MiniRaftCluster {
     List<RaftServerImpl> followers = getFollowers();
     for (int i = 0, removed = 0; i < followers.size() &&
         removed < (removeLeader ? number - 1 : number); i++) {
-      RaftPeer toRemove = getPeer(followers.get(i));
+      RaftPeer toRemove = toRaftPeer(followers.get(i));
       if (!excluded.contains(toRemove)) {
         peers.remove(toRemove);
         removedPeers.add(toRemove);
@@ -381,13 +378,15 @@ public abstract class MiniRaftCluster {
   }
 
   public RaftServerImpl getServer(String id) {
-    return servers.get(new RaftPeerId(id));
+    return getServer(new RaftPeerId(id));
+  }
+
+  public RaftServerImpl getServer(RaftPeerId id) {
+    return servers.get(id);
   }
 
   public Collection<RaftPeer> getPeers() {
-    return getServers().stream().map(s ->
-        new RaftPeer(s.getId(), s.getServerRpc().getInetSocketAddress()))
-        .collect(Collectors.toList());
+    return toRaftPeers(getServers());
   }
 
   public RaftClient createClient(RaftPeerId leaderId) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index 3017634..e3db854 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -583,7 +583,7 @@ public abstract class RaftReconfigurationBaseTest {
   @Test
   public void testLeaderNotReadyException() throws Exception {
     LOG.info("Start testLeaderNotReadyException");
-    final MiniRaftCluster cluster = getCluster(1);
+    final MiniRaftCluster cluster = getCluster(1).initServers();
     final RaftPeerId leaderId = cluster.getPeers().iterator().next().getId();
     try {
       // delay 1s for each logSync call

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
index e33d64f..3ed2596 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
@@ -19,16 +19,17 @@ package org.apache.ratis.server.simulation;
 
 import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.conf.ConfUtils;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.impl.RaftConfiguration;
 import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.ServerImplUtils;
+import org.apache.ratis.statemachine.StateMachine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.concurrent.ThreadLocalRandom;
 
 public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster {
@@ -38,76 +39,52 @@ public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster {
       = new Factory<MiniRaftClusterWithSimulatedRpc>() {
     @Override
     public MiniRaftClusterWithSimulatedRpc newCluster(
-        String[] ids, RaftProperties prop, boolean formatted) {
+        String[] ids, RaftProperties prop) {
       RaftConfigKeys.Rpc.setType(prop::set, SimulatedRpc.INSTANCE);
       if (ThreadLocalRandom.current().nextBoolean()) {
         // turn off simulate latency half of the times.
         prop.setInt(SimulatedRequestReply.SIMULATE_LATENCY_KEY, 0);
       }
-      return new MiniRaftClusterWithSimulatedRpc(ids, prop, formatted);
-    }
-  };
-
-  private SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply;
-  private SimulatedClientRpc client2serverRequestReply;
-
-  private MiniRaftClusterWithSimulatedRpc(String[] ids,
-      RaftProperties properties, boolean formatted) {
-    super(ids, properties, formatted);
-    initRpc();
-  }
-
-  @Override
-  protected void initRpc() {
-    final int simulateLatencyMs = properties.getInt(
-        SimulatedRequestReply.SIMULATE_LATENCY_KEY,
-        SimulatedRequestReply.SIMULATE_LATENCY_DEFAULT);
-    LOG.info(SimulatedRequestReply.SIMULATE_LATENCY_KEY + " = "
-        + simulateLatencyMs);
-    serverRequestReply = new SimulatedRequestReply<>(simulateLatencyMs);
-    client2serverRequestReply = new SimulatedClientRpc(simulateLatencyMs);
-    getServers().stream().forEach(s -> initRpc(s));
-    addPeersToRpc(toRaftPeers(getServers()));
-    ((SimulatedRpc.Factory)clientFactory).initRpc(
-        serverRequestReply, client2serverRequestReply);
-  }
-
-  private void initRpc(RaftServerImpl s) {
-    if (serverRequestReply != null) {
-      ((SimulatedRpc.Factory)s.getFactory()).initRpc(
+      final int simulateLatencyMs = ConfUtils.getInt(prop::getInt,
+          SimulatedRequestReply.SIMULATE_LATENCY_KEY,
+          SimulatedRequestReply.SIMULATE_LATENCY_DEFAULT, 0, null);
+      final SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply
+          = new SimulatedRequestReply<>(simulateLatencyMs);
+      final SimulatedClientRpc client2serverRequestReply
+          = new SimulatedClientRpc(simulateLatencyMs);
+      return new MiniRaftClusterWithSimulatedRpc(ids, prop,
           serverRequestReply, client2serverRequestReply);
     }
-  }
-
-  private void addPeersToRpc(Collection<RaftPeer> peers) {
-    serverRequestReply.addPeers(peers);
-    client2serverRequestReply.addPeers(peers);
-  }
+  };
 
-  @Override
-  protected RaftServerImpl newRaftServer(RaftPeerId id, boolean format) {
-    final RaftServerImpl s = super.newRaftServer(id, format);
-    initRpc(s);
-    return s;
+  private final SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply;
+  private final SimulatedClientRpc client2serverRequestReply;
+
+  private MiniRaftClusterWithSimulatedRpc(
+      String[] ids, RaftProperties properties,
+      SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply,
+      SimulatedClientRpc client2serverRequestReply) {
+    super(ids, properties,
+        SimulatedRpc.Factory.newRaftParameters(serverRequestReply, client2serverRequestReply));
+    this.serverRequestReply = serverRequestReply;
+    this.client2serverRequestReply = client2serverRequestReply;
   }
 
   @Override
-  public void restartServer(String id, boolean format) throws IOException {
-    super.restartServer(id, format);
-    RaftServerImpl s = getServer(id);
-    addPeersToRpc(Collections.singletonList(conf.getPeer(new RaftPeerId(id))));
-    s.start();
+  public void restart(boolean format) throws IOException {
+    serverRequestReply.clear();
+    client2serverRequestReply.clear();
+    super.restart(format);
   }
 
   @Override
-  public Collection<RaftPeer> addNewPeers(
-      Collection<RaftServerImpl> newServers, boolean startService) {
-    final Collection<RaftPeer> newPeers = toRaftPeers(newServers);
-    addPeersToRpc(newPeers);
-    if (startService) {
-      newServers.forEach(RaftServerImpl::start);
-    }
-    return newPeers;
+  protected RaftServerImpl newRaftServer(
+      RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf,
+      RaftProperties properties) throws IOException {
+    serverRequestReply.addPeer(id);
+    client2serverRequestReply.addPeer(id);
+    return ServerImplUtils.newRaftServer(id, stateMachine, conf, properties,
+        parameters);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java
index 95d3efa..64aaeac 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server.simulation;
 import com.google.common.base.Preconditions;
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.RaftRpcMessage;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.util.RaftUtils;
@@ -175,10 +176,12 @@ class SimulatedRequestReply<REQUEST extends RaftRpcMessage,
     queues.remove(id);
   }
 
-  public void addPeers(Collection<RaftPeer> newPeers) {
-    for (RaftPeer peer : newPeers) {
-      queues.put(peer.getId().toString(), new EventQueue<>());
-    }
+  public void clear() {
+    queues.clear();
+  }
+
+  public void addPeer(RaftPeerId newPeer) {
+    queues.put(newPeer.toString(), new EventQueue<>());
   }
 
   private void simulateLatency() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
index 67193b4..69707aa 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
@@ -18,8 +18,8 @@
 package org.apache.ratis.server.simulation;
 
 import org.apache.ratis.client.ClientFactory;
+import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.rpc.RpcFactory;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.ServerFactory;
@@ -35,19 +35,31 @@ class SimulatedRpc implements RpcType {
   }
 
   @Override
-  public RpcFactory newFactory(RaftProperties properties) {
-    return new Factory();
+  public Factory newFactory(RaftProperties properties, Parameters parameters) {
+    return new Factory(parameters);
   }
 
   static class Factory extends ServerFactory.BaseFactory implements ClientFactory {
-    private SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply;
-    private SimulatedClientRpc client2serverRequestReply;
+    static String SERVER_REQUEST_REPLY_KEY = "raft.simulated.serverRequestReply";
+    static String CLIENT_TO_SERVER_REQUEST_REPLY_KEY = "raft.simulated.client2serverRequestReply";
 
-    public void initRpc(
-        SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply,
-        SimulatedClientRpc client2serverRequestReply) {
-      this.serverRequestReply = Objects.requireNonNull(serverRequestReply);
-      this.client2serverRequestReply = Objects.requireNonNull(client2serverRequestReply);
+    static Parameters newRaftParameters(
+        SimulatedRequestReply<RaftServerRequest, RaftServerReply> server,
+        SimulatedClientRpc client2server) {
+      final Parameters p = new Parameters();
+      p.put(SERVER_REQUEST_REPLY_KEY, server, SimulatedRequestReply.class);
+      p.put(CLIENT_TO_SERVER_REQUEST_REPLY_KEY, client2server, SimulatedClientRpc.class);
+      return p;
+    }
+
+    private final SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply;
+    private final SimulatedClientRpc client2serverRequestReply;
+
+    Factory(Parameters parameters) {
+      serverRequestReply = parameters.getNonNull(
+          SERVER_REQUEST_REPLY_KEY, SimulatedRequestReply.class);
+      client2serverRequestReply = parameters.getNonNull(
+          CLIENT_TO_SERVER_REQUEST_REPLY_KEY, SimulatedClientRpc.class);
     }
 
     @Override