You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/02/17 01:05:46 UTC

incubator-ratis git commit: RATIS-21. Add RpcType and ServerFactory. Contributed by Tsz Wo Nicholas Sze.

Repository: incubator-ratis
Updated Branches:
  refs/heads/master 16eb8cc6b -> e1620e804


RATIS-21. Add RpcType and ServerFactory. Contributed by Tsz Wo Nicholas Sze.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/e1620e80
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/e1620e80
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/e1620e80

Branch: refs/heads/master
Commit: e1620e804c5be0fa735065631d45f2f6d8db4502
Parents: 16eb8cc
Author: Jing Zhao <ji...@apache.org>
Authored: Thu Feb 16 17:05:28 2017 -0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Thu Feb 16 17:05:28 2017 -0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/ratis/RpcType.java | 34 ++++++++++++
 .../org/apache/ratis/conf/RaftProperties.java   |  7 ++-
 .../java/org/apache/ratis/util/RaftUtils.java   | 52 ++++++++++++++----
 .../org/apache/ratis/grpc/RaftGRpcService.java  |  6 +++
 .../ratis/grpc/server/GrpcServerFactory.java    | 32 ++++++++++++
 .../server/PipelinedLogAppenderFactory.java     | 32 ------------
 .../ratis/grpc/MiniRaftClusterWithGRpc.java     | 14 +----
 .../grpc/TestNotLeaderExceptionWithGrpc.java    |  6 ---
 .../grpc/TestRaftReconfigurationWithGRpc.java   | 11 ----
 .../org/apache/ratis/grpc/TestRaftStream.java   | 37 +++++--------
 .../org/apache/ratis/grpc/TestRaftWithGrpc.java | 11 ----
 .../hadooprpc/server/HadoopRpcService.java      |  6 +++
 .../ratis/netty/server/NettyRpcService.java     |  6 +++
 .../ratis/server/RaftServerConfigKeys.java      | 46 ++++++++++++++--
 .../org/apache/ratis/server/RaftServerRpc.java  |  3 +-
 .../apache/ratis/server/impl/LeaderState.java   |  6 +--
 .../ratis/server/impl/LogAppenderFactory.java   | 31 -----------
 .../ratis/server/impl/RaftServerImpl.java       | 22 +++-----
 .../apache/ratis/server/impl/ServerFactory.java | 55 ++++++++++++++++++++
 .../server/simulation/SimulatedServerRpc.java   |  6 +++
 20 files changed, 257 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-common/src/main/java/org/apache/ratis/RpcType.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/RpcType.java b/ratis-common/src/main/java/org/apache/ratis/RpcType.java
new file mode 100644
index 0000000..7787613
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/RpcType.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.ratis;
+
+/** The type of RPC implementations. */
+public enum RpcType {
+  NETTY, GRPC, HADOOP, SIMULATED;
+
+  /** Same as {@link #valueOf(String)} except that this method is case insensitive. */
+  public static RpcType valueOfIgnoreCase(String s) {
+    return valueOf(s.toUpperCase());
+  }
+
+  /** An interface to get {@link RpcType}. */
+  public interface Get {
+    /** @return the {@link RpcType}. */
+    RpcType getRpcType();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-common/src/main/java/org/apache/ratis/conf/RaftProperties.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/conf/RaftProperties.java b/ratis-common/src/main/java/org/apache/ratis/conf/RaftProperties.java
index 187f6ce..fc93398 100644
--- a/ratis-common/src/main/java/org/apache/ratis/conf/RaftProperties.java
+++ b/ratis-common/src/main/java/org/apache/ratis/conf/RaftProperties.java
@@ -1227,15 +1227,14 @@ public class RaftProperties {
    * @return property value as a <code>Class</code>,
    *         or <code>defaultValue</code>.
    */
-  public <U> Class<? extends U> getClass(String name,
-                                         Class<? extends U> defaultValue,
-                                         Class<U> xface) {
+  public <BASE, SUB extends BASE> Class<SUB> getClass(
+      String name, Class<SUB> defaultValue, Class<BASE> xface) {
     try {
       Class<?> theClass = getClass(name, defaultValue);
       if (theClass != null && !xface.isAssignableFrom(theClass))
         throw new RuntimeException(theClass+" not "+xface.getName());
       else if (theClass != null)
-        return theClass.asSubclass(xface);
+        return (Class<SUB>)theClass.asSubclass(xface);
       else
         return null;
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java
index 0b3d24e..17e2e41 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java
@@ -29,13 +29,14 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Supplier;
 
 public abstract class RaftUtils {
   public static final Logger LOG = LoggerFactory.getLogger(RaftUtils.class);
-  private static final Class<?>[] EMPTY_CLASS_ARRAY = {};
 
   // OSType detection
   public enum OSType {
@@ -116,26 +117,55 @@ public abstract class RaftUtils {
   }
 
   /**
-   * Create an object for the given class and initialize it from conf
+   * Create an object for the given class using its default constructor.
    *
-   * @param theClass class of which an object is created
+   * @param clazz class of which an object is created
    * @return a new object
    */
-  @SuppressWarnings("unchecked")
-  public static <T> T newInstance(Class<T> theClass, Object... initArgs) {
-    T result;
+  public static <T> T newInstance(Class<T> clazz) {
+    Objects.requireNonNull(clazz, "clazz == null");
     try {
-      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+      @SuppressWarnings("unchecked")
+      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz);
       if (meth == null) {
-        meth = theClass.getDeclaredConstructor(EMPTY_CLASS_ARRAY);
+        meth = clazz.getDeclaredConstructor();
         meth.setAccessible(true);
-        CONSTRUCTOR_CACHE.put(theClass, meth);
+        CONSTRUCTOR_CACHE.put(clazz, meth);
       }
-      result = meth.newInstance(initArgs);
+      return meth.newInstance();
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
-    return result;
+  }
+
+  /**
+   * Create a memoized supplier which gets a value by invoking the initializer once
+   * and then keeps returning the same value as its supplied results.
+   *
+   * @param initializer to supply at most one non-null value.
+   * @param <T> The supplier result type.
+   * @return a memoized supplier which is thread-safe.
+   */
+  public static <T> Supplier<T> memoize(Supplier<T> initializer) {
+    Objects.requireNonNull(initializer, "initializer == null");
+    return new Supplier<T>() {
+      private volatile T value = null;
+
+      @Override
+      public T get() {
+        T v = value;
+        if (v == null) {
+          synchronized (this) {
+            v = value;
+            if (v == null) {
+              v = value = Objects.requireNonNull(initializer.get(),
+                  "initializer.get() returns null");
+            }
+          }
+        }
+        return v;
+      }
+    };
   }
 
   public static int getRandomBetween(int min, int max) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
index 48acf35..473a5c6 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
@@ -19,6 +19,7 @@ package org.apache.ratis.grpc;
 
 import com.google.common.base.Preconditions;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.RpcType;
 import org.apache.ratis.grpc.client.RaftClientProtocolService;
 import org.apache.ratis.grpc.server.RaftServerProtocolClient;
 import org.apache.ratis.grpc.server.RaftServerProtocolService;
@@ -109,6 +110,11 @@ public class RaftGRpcService implements RaftServerRpc {
   }
 
   @Override
+  public RpcType getRpcType() {
+    return RpcType.GRPC;
+  }
+
+  @Override
   public void start() {
     // do nothing
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerFactory.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerFactory.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerFactory.java
new file mode 100644
index 0000000..09e3265
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerFactory.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.server;
+
+import org.apache.ratis.server.impl.FollowerInfo;
+import org.apache.ratis.server.impl.LeaderState;
+import org.apache.ratis.server.impl.LogAppender;
+import org.apache.ratis.server.impl.ServerFactory;
+import org.apache.ratis.server.impl.RaftServerImpl;
+
+public class GrpcServerFactory implements ServerFactory {
+  @Override
+  public LogAppender newLogAppender(RaftServerImpl server, LeaderState state,
+                                    FollowerInfo f) {
+    return new GRpcLogAppender(server, state, f);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/PipelinedLogAppenderFactory.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/PipelinedLogAppenderFactory.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/PipelinedLogAppenderFactory.java
deleted file mode 100644
index d30b391..0000000
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/PipelinedLogAppenderFactory.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc.server;
-
-import org.apache.ratis.server.impl.FollowerInfo;
-import org.apache.ratis.server.impl.LeaderState;
-import org.apache.ratis.server.impl.LogAppender;
-import org.apache.ratis.server.impl.LogAppenderFactory;
-import org.apache.ratis.server.impl.RaftServerImpl;
-
-public class PipelinedLogAppenderFactory implements LogAppenderFactory {
-  @Override
-  public LogAppender getLogAppender(RaftServerImpl server, LeaderState state,
-                                    FollowerInfo f) {
-    return new GRpcLogAppender(server, state, f);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
index 85829e5..757c7ea 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
@@ -18,23 +18,18 @@
 package org.apache.ratis.grpc;
 
 import com.google.common.base.Preconditions;
-
 import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.client.RaftClientRequestSender;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.client.RaftClientSenderWithGrpc;
-import org.apache.ratis.grpc.server.PipelinedLogAppenderFactory;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
 import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
-import org.apache.ratis.server.impl.LogAppenderFactory;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.util.NetUtils;
 
-import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -61,17 +56,10 @@ public class MiniRaftClusterWithGRpc extends MiniRaftCluster.RpcBase {
 
   public MiniRaftClusterWithGRpc(String[] ids, RaftProperties properties,
       boolean formatted) throws IOException {
-    super(ids, getPropForGrpc(properties), formatted);
+    super(ids, new RaftProperties(properties), formatted);
     init(initRpcServices(getServers(), properties));
   }
 
-  private static RaftProperties getPropForGrpc(RaftProperties prop) {
-    RaftProperties newProp = new RaftProperties(prop);
-    newProp.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY,
-        PipelinedLogAppenderFactory.class, LogAppenderFactory.class);
-    return newProp;
-  }
-
   private static Map<RaftPeer, RaftGRpcService> initRpcServices(
       Collection<RaftServerImpl> servers, RaftProperties prop) throws IOException {
     final Map<RaftPeer, RaftGRpcService> peerRpcs = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestNotLeaderExceptionWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestNotLeaderExceptionWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestNotLeaderExceptionWithGrpc.java
index 351e406..8b4c504 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestNotLeaderExceptionWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestNotLeaderExceptionWithGrpc.java
@@ -20,10 +20,6 @@ package org.apache.ratis.grpc;
 import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.RaftNotLeaderExceptionBaseTest;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.grpc.server.PipelinedLogAppenderFactory;
-import org.apache.ratis.server.impl.LogAppenderFactory;
-
-import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY;
 
 import java.io.IOException;
 
@@ -32,8 +28,6 @@ public class TestNotLeaderExceptionWithGrpc extends RaftNotLeaderExceptionBaseTe
   public MiniRaftCluster initCluster() throws IOException {
     String[] s = MiniRaftCluster.generateIds(NUM_PEERS, 0);
     RaftProperties prop = new RaftProperties();
-    prop.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY,
-        PipelinedLogAppenderFactory.class, LogAppenderFactory.class);
     return new MiniRaftClusterWithGRpc(s, prop, true);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java
index 450eb6e..ebc8a6d 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java
@@ -18,14 +18,9 @@
 package org.apache.ratis.grpc;
 
 import org.apache.log4j.Level;
-import org.apache.ratis.grpc.server.PipelinedLogAppenderFactory;
 import org.apache.ratis.grpc.server.RaftServerProtocolService;
-import org.apache.ratis.server.impl.LogAppenderFactory;
 import org.apache.ratis.server.impl.RaftReconfigurationBaseTest;
 import org.apache.ratis.util.RaftUtils;
-import org.junit.BeforeClass;
-
-import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY;
 
 import java.io.IOException;
 
@@ -34,12 +29,6 @@ public class TestRaftReconfigurationWithGRpc extends RaftReconfigurationBaseTest
     RaftUtils.setLogLevel(RaftServerProtocolService.LOG, Level.DEBUG);
   }
 
-  @BeforeClass
-  public static void setProp() {
-    prop.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY,
-        PipelinedLogAppenderFactory.class, LogAppenderFactory.class);
-  }
-
   @Override
   public MiniRaftClusterWithGRpc getCluster(int peerNum) throws IOException {
     return new MiniRaftClusterWithGRpc(peerNum, prop);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
index 99e98c6..ed130dd 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
@@ -19,19 +19,16 @@ package org.apache.ratis.grpc;
 
 import org.apache.log4j.Level;
 import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.client.AppendStreamer;
 import org.apache.ratis.grpc.client.RaftOutputStream;
-import org.apache.ratis.grpc.server.PipelinedLogAppenderFactory;
-import org.apache.ratis.server.impl.LogAppenderFactory;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.util.RaftUtils;
 import org.junit.After;
 import org.junit.Assert;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,7 +43,6 @@ import java.util.function.Supplier;
 
 import static org.apache.ratis.RaftTestUtil.waitForLeader;
 import static org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY;
-import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY;
 import static org.junit.Assert.fail;
 
 public class TestRaftStream {
@@ -57,16 +53,11 @@ public class TestRaftStream {
 
   private static final RaftProperties prop = new RaftProperties();
   private static final int NUM_SERVERS = 3;
+  private static final byte[] BYTES = new byte[4];
 
   private MiniRaftClusterWithGRpc cluster;
 
 
-  @BeforeClass
-  public static void setProp() {
-    prop.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY,
-        PipelinedLogAppenderFactory.class, LogAppenderFactory.class);
-  }
-
   @After
   public void tearDown() {
     if (cluster != null) {
@@ -74,12 +65,10 @@ public class TestRaftStream {
     }
   }
 
-  private byte[] genContent(int count) {
-    return toBytes(count);
-  }
-
   private byte[] toBytes(int i) {
-    byte[] b = new byte[4];
+    return toBytes(i, BYTES);
+  }
+  private byte[] toBytes(int i, byte[] b) {
     b[0] = (byte) ((i >>> 24) & 0xFF);
     b[1] = (byte) ((i >>> 16) & 0xFF);
     b[2] = (byte) ((i >>> 8) & 0xFF);
@@ -98,21 +87,20 @@ public class TestRaftStream {
     cluster.start();
     RaftServerImpl leader = waitForLeader(cluster);
 
-    int count = 1;
+    final Random r = new Random();
+    final long seed = r.nextLong();
+    r.setSeed(seed);
     try (RaftOutputStream out = new RaftOutputStream(prop, ClientId.createId(),
-         cluster.getPeers(), leader.getId())) {
+        cluster.getPeers(), leader.getId())) {
       for (int i = 0; i < 500; i++) { // generate 500 requests
-        out.write(genContent(count++));
+        out.write(toBytes(r.nextInt()));
       }
     }
 
     // check the leader's raft log
     final RaftLog raftLog = leader.getState().getLog();
-    final AtomicInteger currentNum = new AtomicInteger(1);
-    checkLog(raftLog, 500, () -> {
-      int value = currentNum.getAndIncrement();
-      return toBytes(value);
-    });
+    r.setSeed(seed);
+    checkLog(raftLog, 500, () -> toBytes(r.nextInt()));
   }
 
   private void checkLog(RaftLog raftLog, long expectedCommittedIndex,
@@ -301,6 +289,7 @@ public class TestRaftStream {
     }).start();
 
     // force change the leader
+    Thread.sleep(500);
     RaftTestUtil.waitAndKillLeader(cluster, true);
     final RaftServerImpl newLeader = waitForLeader(cluster);
     Assert.assertNotEquals(leader.getId(), newLeader.getId());

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index b60e30d..1ca602f 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -19,17 +19,12 @@ package org.apache.ratis.grpc;
 
 import org.apache.log4j.Level;
 import org.apache.ratis.RaftBasicTests;
-import org.apache.ratis.grpc.server.PipelinedLogAppenderFactory;
 import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
-import org.apache.ratis.server.impl.LogAppenderFactory;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.util.RaftUtils;
 import org.junit.Assert;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
-import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY;
-
 import java.io.IOException;
 
 public class TestRaftWithGrpc extends RaftBasicTests {
@@ -39,12 +34,6 @@ public class TestRaftWithGrpc extends RaftBasicTests {
 
   private final MiniRaftClusterWithGRpc cluster;
 
-  @BeforeClass
-  public static void setProp() {
-    properties.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY,
-        PipelinedLogAppenderFactory.class, LogAppenderFactory.class);
-  }
-
   public TestRaftWithGrpc() throws IOException {
     cluster = new MiniRaftClusterWithGRpc(NUM_SERVERS, properties);
     Assert.assertNull(cluster.getLeader());

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
index eb93c5c..7f4a251 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
@@ -20,6 +20,7 @@ package org.apache.ratis.hadooprpc.server;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.ProtobufRpcEngineShaded;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.ratis.RpcType;
 import org.apache.ratis.hadooprpc.Proxy;
 import org.apache.ratis.hadooprpc.client.RaftClientProtocolPB;
 import org.apache.ratis.hadooprpc.client.RaftClientProtocolServerSideTranslatorPB;
@@ -102,6 +103,11 @@ public class HadoopRpcService implements RaftServerRpc {
   }
 
   @Override
+  public RpcType getRpcType() {
+    return RpcType.HADOOP;
+  }
+
+  @Override
   public InetSocketAddress getInetSocketAddress() {
     return ipcServerAddress;
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
index a659665..5a2bac5 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
@@ -27,6 +27,7 @@ import io.netty.handler.logging.LogLevel;
 import io.netty.handler.logging.LoggingHandler;
 
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.RpcType;
 import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder;
 import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufEncoder;
 import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
@@ -124,6 +125,11 @@ public final class NettyRpcService implements RaftServerRpc {
         .bind(port);
   }
 
+  @Override
+  public RpcType getRpcType() {
+    return RpcType.NETTY;
+  }
+
   private Channel getChannel() {
     return channelFuture.awaitUninterruptibly().channel();
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index d3c5173..09c77a9 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -17,7 +17,7 @@
  */
 package org.apache.ratis.server;
 
-import org.apache.ratis.server.impl.LogAppenderFactory;
+import org.apache.ratis.RpcType;
 import org.apache.ratis.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,16 +56,52 @@ public interface RaftServerConfigKeys {
     }
   }
 
+  enum Factory {
+    NETTY("org.apache.ratis.server.impl.ServerFactory$BaseFactory"),
+    GRPC("org.apache.ratis.grpc.server.GrpcServerFactory"),
+    HADOOP("org.apache.ratis.server.impl.ServerFactory$BaseFactory"),
+    SIMULATED("org.apache.ratis.server.impl.ServerFactory$BaseFactory");
+
+    public static String getKey(String rpcType) {
+      return RaftServerConfigKeys.PREFIX + ".factory." + rpcType + ".class";
+    }
+
+    public static Factory valueOf(RpcType rpcType) {
+      return valueOf(rpcType.name());
+    }
+
+    private final RpcType rpcType = RpcType.valueOf(name());
+    private final String key = getKey(name().toLowerCase());
+    private final String defaultClass;
+
+    Factory(String defaultClass) {
+      this.defaultClass = defaultClass;
+    }
+
+    public RpcType getRpcType() {
+      return rpcType;
+    }
+
+    public String getKey() {
+      return key;
+    }
+
+    public String getDefaultClass() {
+      return defaultClass;
+    }
+
+    @Override
+    public String toString() {
+      return getRpcType() + ":" + getKey() + ":" + getDefaultClass();
+    }
+  }
+
   String RAFT_SERVER_USE_MEMORY_LOG_KEY = "raft.server.use.memory.log";
   boolean RAFT_SERVER_USE_MEMORY_LOG_DEFAULT = false;
 
   String RAFT_SERVER_STORAGE_DIR_KEY = "raft.server.storage.dir";
   String RAFT_SERVER_STORAGE_DIR_DEFAULT = "file:///tmp/raft-server/";
 
-  String RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY = "raft.server.log.appender.factory.class";
-  Class<? extends LogAppenderFactory> RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_DEFAULT
-      = LogAppenderFactory.SynchronousLogAppenderFactory.class;
-
   /** whether trigger snapshot when log size exceeds limit */
   String RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY = "raft.server.auto.snapshot.enabled";
   /** by default let the state machine to decide when to do checkpoint */

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
index 61b3b2e..e68c536 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.server;
 
+import org.apache.ratis.RpcType;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.protocol.RaftServerProtocol;
 
@@ -28,7 +29,7 @@ import java.net.InetSocketAddress;
  * An server-side interface for supporting different RPC implementations
  * such as Netty, gRPC and Hadoop.
  */
-public interface RaftServerRpc extends RaftServerProtocol, Closeable {
+public interface RaftServerRpc extends RaftServerProtocol, RpcType.Get, Closeable {
   /** To build {@link RaftServerRpc} objects. */
   abstract class Builder<B extends Builder, RPC extends RaftServerRpc> {
     private RaftServer server;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 35d120c..d5d6adc 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -139,9 +139,10 @@ public class LeaderState {
     final Timestamp t = new Timestamp().addTimeMs(-server.getMaxTimeoutMs());
     final long nextIndex = raftLog.getNextIndex();
     senders = new ArrayList<>(others.size());
+
     for (RaftPeer p : others) {
       FollowerInfo f = new FollowerInfo(p, t, nextIndex, true);
-      senders.add(server.getLogAppenderFactory().getLogAppender(server, this, f));
+      senders.add(server.getFactory().newLogAppender(server, this, f));
     }
     voterLists = divideFollowers(conf);
   }
@@ -263,8 +264,7 @@ public class LeaderState {
     final long nextIndex = raftLog.getNextIndex();
     for (RaftPeer peer : newMembers) {
       FollowerInfo f = new FollowerInfo(peer, t, nextIndex, false);
-      LogAppender sender = server.getLogAppenderFactory()
-          .getLogAppender(server, this, f);
+      LogAppender sender = server.getFactory().newLogAppender(server, this, f);
       senders.add(sender);
       sender.start();
     }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderFactory.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderFactory.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderFactory.java
deleted file mode 100644
index e6cc213..0000000
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderFactory.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.server.impl;
-
-public interface LogAppenderFactory {
-  LogAppender getLogAppender(RaftServerImpl server, LeaderState state,
-                             FollowerInfo f);
-
-  class SynchronousLogAppenderFactory implements LogAppenderFactory {
-    @Override
-    public LogAppender getLogAppender(RaftServerImpl server, LeaderState state,
-                                      FollowerInfo f) {
-      return new LogAppender(server, state, f);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 30bfa04..7d9e049 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -28,6 +28,7 @@ import java.io.InterruptedIOException;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
 
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.Message;
@@ -100,7 +101,7 @@ public class RaftServerImpl implements RaftServer {
 
   private RaftServerRpc serverRpc;
 
-  private final LogAppenderFactory appenderFactory;
+  private final Supplier<ServerFactory> factory ;
 
   RaftServerImpl(RaftPeerId id, StateMachine stateMachine,
                  RaftConfiguration raftConf, RaftProperties properties)
@@ -117,7 +118,12 @@ public class RaftServerImpl implements RaftServer {
     this.properties = properties;
     this.stateMachine = stateMachine;
     this.state = new ServerState(id, raftConf, properties, this, stateMachine);
-    appenderFactory = initAppenderFactory();
+    this.factory = RaftUtils.memoize(
+        () -> ServerFactory.Util.newServerFactory(getServerRpc().getRpcType(), properties));
+  }
+
+  ServerFactory getFactory() {
+    return factory.get();
   }
 
   int getMinTimeoutMs() {
@@ -137,18 +143,6 @@ public class RaftServerImpl implements RaftServer {
     return this.stateMachine;
   }
 
-  public LogAppenderFactory getLogAppenderFactory() {
-    return appenderFactory;
-  }
-
-  private LogAppenderFactory initAppenderFactory() {
-    Class<? extends LogAppenderFactory> factoryClass = properties.getClass(
-        RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY,
-        RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_DEFAULT,
-        LogAppenderFactory.class);
-    return RaftUtils.newInstance(factoryClass);
-  }
-
   /**
    * Used by tests to set initial raft configuration with correct port bindings.
    */

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java
new file mode 100644
index 0000000..38caba7
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.RpcType;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.util.RaftUtils;
+
+import java.util.Objects;
+
+/** A factory interface for creating server components. */
+public interface ServerFactory {
+  /** Create a new {@link LogAppender}. */
+  LogAppender newLogAppender(RaftServerImpl server, LeaderState state, FollowerInfo f);
+
+  class BaseFactory implements ServerFactory {
+    @Override
+    public LogAppender newLogAppender(
+        RaftServerImpl server, LeaderState state, FollowerInfo f) {
+      return new LogAppender(server, state, f);
+    }
+  }
+
+  class Util {
+    private static <T extends ServerFactory> Class<T> getClass(
+        RaftServerConfigKeys.Factory f, RaftProperties properties) {
+      final Class<T> defaultClass = (Class<T>) properties.getClassByNameOrNull(f.getDefaultClass());
+      Objects.requireNonNull(defaultClass, () -> "Failed to get the default class for " + f);
+      return properties.getClass(f.getKey(), defaultClass, ServerFactory.class);
+    }
+
+    /** Create a new {@link ServerFactory}. */
+    public static <T extends ServerFactory> T newServerFactory(
+        RpcType rpcType, RaftProperties properties) {
+      return RaftUtils.newInstance(
+          getClass(RaftServerConfigKeys.Factory.valueOf(rpcType), properties));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
index 09d8493..c8257ac 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.ratis.RpcType;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;
@@ -61,6 +62,11 @@ class SimulatedServerRpc implements RaftServerRpc {
   }
 
   @Override
+  public RpcType getRpcType() {
+    return RpcType.SIMULATED;
+  }
+
+  @Override
   public void start() {
     serverHandler.startDaemon();
     clientHandler.startDaemon();