You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/02/17 01:05:46 UTC
incubator-ratis git commit: RATIS-21. Add RpcType and ServerFactory.
Contributed by Tsz Wo Nicholas Sze.
Repository: incubator-ratis
Updated Branches:
refs/heads/master 16eb8cc6b -> e1620e804
RATIS-21. Add RpcType and ServerFactory. Contributed by Tsz Wo Nicholas Sze.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/e1620e80
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/e1620e80
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/e1620e80
Branch: refs/heads/master
Commit: e1620e804c5be0fa735065631d45f2f6d8db4502
Parents: 16eb8cc
Author: Jing Zhao <ji...@apache.org>
Authored: Thu Feb 16 17:05:28 2017 -0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Thu Feb 16 17:05:28 2017 -0800
----------------------------------------------------------------------
.../src/main/java/org/apache/ratis/RpcType.java | 34 ++++++++++++
.../org/apache/ratis/conf/RaftProperties.java | 7 ++-
.../java/org/apache/ratis/util/RaftUtils.java | 52 ++++++++++++++----
.../org/apache/ratis/grpc/RaftGRpcService.java | 6 +++
.../ratis/grpc/server/GrpcServerFactory.java | 32 ++++++++++++
.../server/PipelinedLogAppenderFactory.java | 32 ------------
.../ratis/grpc/MiniRaftClusterWithGRpc.java | 14 +----
.../grpc/TestNotLeaderExceptionWithGrpc.java | 6 ---
.../grpc/TestRaftReconfigurationWithGRpc.java | 11 ----
.../org/apache/ratis/grpc/TestRaftStream.java | 37 +++++--------
.../org/apache/ratis/grpc/TestRaftWithGrpc.java | 11 ----
.../hadooprpc/server/HadoopRpcService.java | 6 +++
.../ratis/netty/server/NettyRpcService.java | 6 +++
.../ratis/server/RaftServerConfigKeys.java | 46 ++++++++++++++--
.../org/apache/ratis/server/RaftServerRpc.java | 3 +-
.../apache/ratis/server/impl/LeaderState.java | 6 +--
.../ratis/server/impl/LogAppenderFactory.java | 31 -----------
.../ratis/server/impl/RaftServerImpl.java | 22 +++-----
.../apache/ratis/server/impl/ServerFactory.java | 55 ++++++++++++++++++++
.../server/simulation/SimulatedServerRpc.java | 6 +++
20 files changed, 257 insertions(+), 166 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-common/src/main/java/org/apache/ratis/RpcType.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/RpcType.java b/ratis-common/src/main/java/org/apache/ratis/RpcType.java
new file mode 100644
index 0000000..7787613
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/RpcType.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+/** The type of RPC implementations. */
+public enum RpcType {
+ NETTY, GRPC, HADOOP, SIMULATED;
+
+ /** Same as {@link #valueOf(String)} except that this method is case insensitive. */
+ public static RpcType valueOfIgnoreCase(String s) {
+ return valueOf(s.toUpperCase());
+ }
+
+ /** An interface to get {@link RpcType}. */
+ public interface Get {
+ /** @return the {@link RpcType}. */
+ RpcType getRpcType();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-common/src/main/java/org/apache/ratis/conf/RaftProperties.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/conf/RaftProperties.java b/ratis-common/src/main/java/org/apache/ratis/conf/RaftProperties.java
index 187f6ce..fc93398 100644
--- a/ratis-common/src/main/java/org/apache/ratis/conf/RaftProperties.java
+++ b/ratis-common/src/main/java/org/apache/ratis/conf/RaftProperties.java
@@ -1227,15 +1227,14 @@ public class RaftProperties {
* @return property value as a <code>Class</code>,
* or <code>defaultValue</code>.
*/
- public <U> Class<? extends U> getClass(String name,
- Class<? extends U> defaultValue,
- Class<U> xface) {
+ public <BASE, SUB extends BASE> Class<SUB> getClass(
+ String name, Class<SUB> defaultValue, Class<BASE> xface) {
try {
Class<?> theClass = getClass(name, defaultValue);
if (theClass != null && !xface.isAssignableFrom(theClass))
throw new RuntimeException(theClass+" not "+xface.getName());
else if (theClass != null)
- return theClass.asSubclass(xface);
+ return (Class<SUB>)theClass.asSubclass(xface);
else
return null;
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java
index 0b3d24e..17e2e41 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java
@@ -29,13 +29,14 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Iterator;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Supplier;
public abstract class RaftUtils {
public static final Logger LOG = LoggerFactory.getLogger(RaftUtils.class);
- private static final Class<?>[] EMPTY_CLASS_ARRAY = {};
// OSType detection
public enum OSType {
@@ -116,26 +117,55 @@ public abstract class RaftUtils {
}
/**
- * Create an object for the given class and initialize it from conf
+ * Create an object for the given class using its default constructor.
*
- * @param theClass class of which an object is created
+ * @param clazz class of which an object is created
* @return a new object
*/
- @SuppressWarnings("unchecked")
- public static <T> T newInstance(Class<T> theClass, Object... initArgs) {
- T result;
+ public static <T> T newInstance(Class<T> clazz) {
+ Objects.requireNonNull(clazz, "clazz == null");
try {
- Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+ @SuppressWarnings("unchecked")
+ Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz);
if (meth == null) {
- meth = theClass.getDeclaredConstructor(EMPTY_CLASS_ARRAY);
+ meth = clazz.getDeclaredConstructor();
meth.setAccessible(true);
- CONSTRUCTOR_CACHE.put(theClass, meth);
+ CONSTRUCTOR_CACHE.put(clazz, meth);
}
- result = meth.newInstance(initArgs);
+ return meth.newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
- return result;
+ }
+
+ /**
+ * Create a memoized supplier which gets a value by invoking the initializer once
+ * and then keeps returning the same value as its supplied results.
+ *
+ * @param initializer to supply at most one non-null value.
+ * @param <T> The supplier result type.
+ * @return a memoized supplier which is thread-safe.
+ */
+ public static <T> Supplier<T> memoize(Supplier<T> initializer) {
+ Objects.requireNonNull(initializer, "initializer == null");
+ return new Supplier<T>() {
+ private volatile T value = null;
+
+ @Override
+ public T get() {
+ T v = value;
+ if (v == null) {
+ synchronized (this) {
+ v = value;
+ if (v == null) {
+ v = value = Objects.requireNonNull(initializer.get(),
+ "initializer.get() returns null");
+ }
+ }
+ }
+ return v;
+ }
+ };
}
public static int getRandomBetween(int min, int max) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
index 48acf35..473a5c6 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
@@ -19,6 +19,7 @@ package org.apache.ratis.grpc;
import com.google.common.base.Preconditions;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.RpcType;
import org.apache.ratis.grpc.client.RaftClientProtocolService;
import org.apache.ratis.grpc.server.RaftServerProtocolClient;
import org.apache.ratis.grpc.server.RaftServerProtocolService;
@@ -109,6 +110,11 @@ public class RaftGRpcService implements RaftServerRpc {
}
@Override
+ public RpcType getRpcType() {
+ return RpcType.GRPC;
+ }
+
+ @Override
public void start() {
// do nothing
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerFactory.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerFactory.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerFactory.java
new file mode 100644
index 0000000..09e3265
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerFactory.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc.server;
+
+import org.apache.ratis.server.impl.FollowerInfo;
+import org.apache.ratis.server.impl.LeaderState;
+import org.apache.ratis.server.impl.LogAppender;
+import org.apache.ratis.server.impl.ServerFactory;
+import org.apache.ratis.server.impl.RaftServerImpl;
+
+public class GrpcServerFactory implements ServerFactory {
+ @Override
+ public LogAppender newLogAppender(RaftServerImpl server, LeaderState state,
+ FollowerInfo f) {
+ return new GRpcLogAppender(server, state, f);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/PipelinedLogAppenderFactory.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/PipelinedLogAppenderFactory.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/PipelinedLogAppenderFactory.java
deleted file mode 100644
index d30b391..0000000
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/PipelinedLogAppenderFactory.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc.server;
-
-import org.apache.ratis.server.impl.FollowerInfo;
-import org.apache.ratis.server.impl.LeaderState;
-import org.apache.ratis.server.impl.LogAppender;
-import org.apache.ratis.server.impl.LogAppenderFactory;
-import org.apache.ratis.server.impl.RaftServerImpl;
-
-public class PipelinedLogAppenderFactory implements LogAppenderFactory {
- @Override
- public LogAppender getLogAppender(RaftServerImpl server, LeaderState state,
- FollowerInfo f) {
- return new GRpcLogAppender(server, state, f);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
index 85829e5..757c7ea 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
@@ -18,23 +18,18 @@
package org.apache.ratis.grpc;
import com.google.common.base.Preconditions;
-
import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClientRequestSender;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.client.RaftClientSenderWithGrpc;
-import org.apache.ratis.grpc.server.PipelinedLogAppenderFactory;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
-import org.apache.ratis.server.impl.LogAppenderFactory;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.util.NetUtils;
-import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -61,17 +56,10 @@ public class MiniRaftClusterWithGRpc extends MiniRaftCluster.RpcBase {
public MiniRaftClusterWithGRpc(String[] ids, RaftProperties properties,
boolean formatted) throws IOException {
- super(ids, getPropForGrpc(properties), formatted);
+ super(ids, new RaftProperties(properties), formatted);
init(initRpcServices(getServers(), properties));
}
- private static RaftProperties getPropForGrpc(RaftProperties prop) {
- RaftProperties newProp = new RaftProperties(prop);
- newProp.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY,
- PipelinedLogAppenderFactory.class, LogAppenderFactory.class);
- return newProp;
- }
-
private static Map<RaftPeer, RaftGRpcService> initRpcServices(
Collection<RaftServerImpl> servers, RaftProperties prop) throws IOException {
final Map<RaftPeer, RaftGRpcService> peerRpcs = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestNotLeaderExceptionWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestNotLeaderExceptionWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestNotLeaderExceptionWithGrpc.java
index 351e406..8b4c504 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestNotLeaderExceptionWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestNotLeaderExceptionWithGrpc.java
@@ -20,10 +20,6 @@ package org.apache.ratis.grpc;
import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.RaftNotLeaderExceptionBaseTest;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.grpc.server.PipelinedLogAppenderFactory;
-import org.apache.ratis.server.impl.LogAppenderFactory;
-
-import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY;
import java.io.IOException;
@@ -32,8 +28,6 @@ public class TestNotLeaderExceptionWithGrpc extends RaftNotLeaderExceptionBaseTe
public MiniRaftCluster initCluster() throws IOException {
String[] s = MiniRaftCluster.generateIds(NUM_PEERS, 0);
RaftProperties prop = new RaftProperties();
- prop.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY,
- PipelinedLogAppenderFactory.class, LogAppenderFactory.class);
return new MiniRaftClusterWithGRpc(s, prop, true);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java
index 450eb6e..ebc8a6d 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftReconfigurationWithGRpc.java
@@ -18,14 +18,9 @@
package org.apache.ratis.grpc;
import org.apache.log4j.Level;
-import org.apache.ratis.grpc.server.PipelinedLogAppenderFactory;
import org.apache.ratis.grpc.server.RaftServerProtocolService;
-import org.apache.ratis.server.impl.LogAppenderFactory;
import org.apache.ratis.server.impl.RaftReconfigurationBaseTest;
import org.apache.ratis.util.RaftUtils;
-import org.junit.BeforeClass;
-
-import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY;
import java.io.IOException;
@@ -34,12 +29,6 @@ public class TestRaftReconfigurationWithGRpc extends RaftReconfigurationBaseTest
RaftUtils.setLogLevel(RaftServerProtocolService.LOG, Level.DEBUG);
}
- @BeforeClass
- public static void setProp() {
- prop.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY,
- PipelinedLogAppenderFactory.class, LogAppenderFactory.class);
- }
-
@Override
public MiniRaftClusterWithGRpc getCluster(int peerNum) throws IOException {
return new MiniRaftClusterWithGRpc(peerNum, prop);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
index 99e98c6..ed130dd 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
@@ -19,19 +19,16 @@ package org.apache.ratis.grpc;
import org.apache.log4j.Level;
import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.client.AppendStreamer;
import org.apache.ratis.grpc.client.RaftOutputStream;
-import org.apache.ratis.grpc.server.PipelinedLogAppenderFactory;
-import org.apache.ratis.server.impl.LogAppenderFactory;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.util.RaftUtils;
import org.junit.After;
import org.junit.Assert;
-import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +43,6 @@ import java.util.function.Supplier;
import static org.apache.ratis.RaftTestUtil.waitForLeader;
import static org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY;
-import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY;
import static org.junit.Assert.fail;
public class TestRaftStream {
@@ -57,16 +53,11 @@ public class TestRaftStream {
private static final RaftProperties prop = new RaftProperties();
private static final int NUM_SERVERS = 3;
+ private static final byte[] BYTES = new byte[4];
private MiniRaftClusterWithGRpc cluster;
- @BeforeClass
- public static void setProp() {
- prop.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY,
- PipelinedLogAppenderFactory.class, LogAppenderFactory.class);
- }
-
@After
public void tearDown() {
if (cluster != null) {
@@ -74,12 +65,10 @@ public class TestRaftStream {
}
}
- private byte[] genContent(int count) {
- return toBytes(count);
- }
-
private byte[] toBytes(int i) {
- byte[] b = new byte[4];
+ return toBytes(i, BYTES);
+ }
+ private byte[] toBytes(int i, byte[] b) {
b[0] = (byte) ((i >>> 24) & 0xFF);
b[1] = (byte) ((i >>> 16) & 0xFF);
b[2] = (byte) ((i >>> 8) & 0xFF);
@@ -98,21 +87,20 @@ public class TestRaftStream {
cluster.start();
RaftServerImpl leader = waitForLeader(cluster);
- int count = 1;
+ final Random r = new Random();
+ final long seed = r.nextLong();
+ r.setSeed(seed);
try (RaftOutputStream out = new RaftOutputStream(prop, ClientId.createId(),
- cluster.getPeers(), leader.getId())) {
+ cluster.getPeers(), leader.getId())) {
for (int i = 0; i < 500; i++) { // generate 500 requests
- out.write(genContent(count++));
+ out.write(toBytes(r.nextInt()));
}
}
// check the leader's raft log
final RaftLog raftLog = leader.getState().getLog();
- final AtomicInteger currentNum = new AtomicInteger(1);
- checkLog(raftLog, 500, () -> {
- int value = currentNum.getAndIncrement();
- return toBytes(value);
- });
+ r.setSeed(seed);
+ checkLog(raftLog, 500, () -> toBytes(r.nextInt()));
}
private void checkLog(RaftLog raftLog, long expectedCommittedIndex,
@@ -301,6 +289,7 @@ public class TestRaftStream {
}).start();
// force change the leader
+ Thread.sleep(500);
RaftTestUtil.waitAndKillLeader(cluster, true);
final RaftServerImpl newLeader = waitForLeader(cluster);
Assert.assertNotEquals(leader.getId(), newLeader.getId());
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index b60e30d..1ca602f 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -19,17 +19,12 @@ package org.apache.ratis.grpc;
import org.apache.log4j.Level;
import org.apache.ratis.RaftBasicTests;
-import org.apache.ratis.grpc.server.PipelinedLogAppenderFactory;
import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
-import org.apache.ratis.server.impl.LogAppenderFactory;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.util.RaftUtils;
import org.junit.Assert;
-import org.junit.BeforeClass;
import org.junit.Test;
-import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY;
-
import java.io.IOException;
public class TestRaftWithGrpc extends RaftBasicTests {
@@ -39,12 +34,6 @@ public class TestRaftWithGrpc extends RaftBasicTests {
private final MiniRaftClusterWithGRpc cluster;
- @BeforeClass
- public static void setProp() {
- properties.setClass(RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY,
- PipelinedLogAppenderFactory.class, LogAppenderFactory.class);
- }
-
public TestRaftWithGrpc() throws IOException {
cluster = new MiniRaftClusterWithGRpc(NUM_SERVERS, properties);
Assert.assertNull(cluster.getLeader());
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
index eb93c5c..7f4a251 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
@@ -20,6 +20,7 @@ package org.apache.ratis.hadooprpc.server;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngineShaded;
import org.apache.hadoop.ipc.RPC;
+import org.apache.ratis.RpcType;
import org.apache.ratis.hadooprpc.Proxy;
import org.apache.ratis.hadooprpc.client.RaftClientProtocolPB;
import org.apache.ratis.hadooprpc.client.RaftClientProtocolServerSideTranslatorPB;
@@ -102,6 +103,11 @@ public class HadoopRpcService implements RaftServerRpc {
}
@Override
+ public RpcType getRpcType() {
+ return RpcType.HADOOP;
+ }
+
+ @Override
public InetSocketAddress getInetSocketAddress() {
return ipcServerAddress;
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
index a659665..5a2bac5 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
@@ -27,6 +27,7 @@ import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.RpcType;
import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder;
import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufEncoder;
import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
@@ -124,6 +125,11 @@ public final class NettyRpcService implements RaftServerRpc {
.bind(port);
}
+ @Override
+ public RpcType getRpcType() {
+ return RpcType.NETTY;
+ }
+
private Channel getChannel() {
return channelFuture.awaitUninterruptibly().channel();
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index d3c5173..09c77a9 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -17,7 +17,7 @@
*/
package org.apache.ratis.server;
-import org.apache.ratis.server.impl.LogAppenderFactory;
+import org.apache.ratis.RpcType;
import org.apache.ratis.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,16 +56,52 @@ public interface RaftServerConfigKeys {
}
}
+ enum Factory {
+ NETTY("org.apache.ratis.server.impl.ServerFactory$BaseFactory"),
+ GRPC("org.apache.ratis.grpc.server.GrpcServerFactory"),
+ HADOOP("org.apache.ratis.server.impl.ServerFactory$BaseFactory"),
+ SIMULATED("org.apache.ratis.server.impl.ServerFactory$BaseFactory");
+
+ public static String getKey(String rpcType) {
+ return RaftServerConfigKeys.PREFIX + ".factory." + rpcType + ".class";
+ }
+
+ public static Factory valueOf(RpcType rpcType) {
+ return valueOf(rpcType.name());
+ }
+
+ private final RpcType rpcType = RpcType.valueOf(name());
+ private final String key = getKey(name().toLowerCase());
+ private final String defaultClass;
+
+ Factory(String defaultClass) {
+ this.defaultClass = defaultClass;
+ }
+
+ public RpcType getRpcType() {
+ return rpcType;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public String getDefaultClass() {
+ return defaultClass;
+ }
+
+ @Override
+ public String toString() {
+ return getRpcType() + ":" + getKey() + ":" + getDefaultClass();
+ }
+ }
+
String RAFT_SERVER_USE_MEMORY_LOG_KEY = "raft.server.use.memory.log";
boolean RAFT_SERVER_USE_MEMORY_LOG_DEFAULT = false;
String RAFT_SERVER_STORAGE_DIR_KEY = "raft.server.storage.dir";
String RAFT_SERVER_STORAGE_DIR_DEFAULT = "file:///tmp/raft-server/";
- String RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY = "raft.server.log.appender.factory.class";
- Class<? extends LogAppenderFactory> RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_DEFAULT
- = LogAppenderFactory.SynchronousLogAppenderFactory.class;
-
/** whether trigger snapshot when log size exceeds limit */
String RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY = "raft.server.auto.snapshot.enabled";
/** by default let the state machine to decide when to do checkpoint */
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
index 61b3b2e..e68c536 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.server;
+import org.apache.ratis.RpcType;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.protocol.RaftServerProtocol;
@@ -28,7 +29,7 @@ import java.net.InetSocketAddress;
* An server-side interface for supporting different RPC implementations
* such as Netty, gRPC and Hadoop.
*/
-public interface RaftServerRpc extends RaftServerProtocol, Closeable {
+public interface RaftServerRpc extends RaftServerProtocol, RpcType.Get, Closeable {
/** To build {@link RaftServerRpc} objects. */
abstract class Builder<B extends Builder, RPC extends RaftServerRpc> {
private RaftServer server;
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 35d120c..d5d6adc 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -139,9 +139,10 @@ public class LeaderState {
final Timestamp t = new Timestamp().addTimeMs(-server.getMaxTimeoutMs());
final long nextIndex = raftLog.getNextIndex();
senders = new ArrayList<>(others.size());
+
for (RaftPeer p : others) {
FollowerInfo f = new FollowerInfo(p, t, nextIndex, true);
- senders.add(server.getLogAppenderFactory().getLogAppender(server, this, f));
+ senders.add(server.getFactory().newLogAppender(server, this, f));
}
voterLists = divideFollowers(conf);
}
@@ -263,8 +264,7 @@ public class LeaderState {
final long nextIndex = raftLog.getNextIndex();
for (RaftPeer peer : newMembers) {
FollowerInfo f = new FollowerInfo(peer, t, nextIndex, false);
- LogAppender sender = server.getLogAppenderFactory()
- .getLogAppender(server, this, f);
+ LogAppender sender = server.getFactory().newLogAppender(server, this, f);
senders.add(sender);
sender.start();
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderFactory.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderFactory.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderFactory.java
deleted file mode 100644
index e6cc213..0000000
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderFactory.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.server.impl;
-
-public interface LogAppenderFactory {
- LogAppender getLogAppender(RaftServerImpl server, LeaderState state,
- FollowerInfo f);
-
- class SynchronousLogAppenderFactory implements LogAppenderFactory {
- @Override
- public LogAppender getLogAppender(RaftServerImpl server, LeaderState state,
- FollowerInfo f) {
- return new LogAppender(server, state, f);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 30bfa04..7d9e049 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -28,6 +28,7 @@ import java.io.InterruptedIOException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.Message;
@@ -100,7 +101,7 @@ public class RaftServerImpl implements RaftServer {
private RaftServerRpc serverRpc;
- private final LogAppenderFactory appenderFactory;
+ private final Supplier<ServerFactory> factory ;
RaftServerImpl(RaftPeerId id, StateMachine stateMachine,
RaftConfiguration raftConf, RaftProperties properties)
@@ -117,7 +118,12 @@ public class RaftServerImpl implements RaftServer {
this.properties = properties;
this.stateMachine = stateMachine;
this.state = new ServerState(id, raftConf, properties, this, stateMachine);
- appenderFactory = initAppenderFactory();
+ this.factory = RaftUtils.memoize(
+ () -> ServerFactory.Util.newServerFactory(getServerRpc().getRpcType(), properties));
+ }
+
+ ServerFactory getFactory() {
+ return factory.get();
}
int getMinTimeoutMs() {
@@ -137,18 +143,6 @@ public class RaftServerImpl implements RaftServer {
return this.stateMachine;
}
- public LogAppenderFactory getLogAppenderFactory() {
- return appenderFactory;
- }
-
- private LogAppenderFactory initAppenderFactory() {
- Class<? extends LogAppenderFactory> factoryClass = properties.getClass(
- RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_KEY,
- RaftServerConfigKeys.RAFT_SERVER_LOG_APPENDER_FACTORY_CLASS_DEFAULT,
- LogAppenderFactory.class);
- return RaftUtils.newInstance(factoryClass);
- }
-
/**
* Used by tests to set initial raft configuration with correct port bindings.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java
new file mode 100644
index 0000000..38caba7
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.RpcType;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.util.RaftUtils;
+
+import java.util.Objects;
+
+/** A factory interface for creating server components. */
+public interface ServerFactory {
+ /** Create a new {@link LogAppender}. */
+ LogAppender newLogAppender(RaftServerImpl server, LeaderState state, FollowerInfo f);
+
+ class BaseFactory implements ServerFactory {
+ @Override
+ public LogAppender newLogAppender(
+ RaftServerImpl server, LeaderState state, FollowerInfo f) {
+ return new LogAppender(server, state, f);
+ }
+ }
+
+ class Util {
+ private static <T extends ServerFactory> Class<T> getClass(
+ RaftServerConfigKeys.Factory f, RaftProperties properties) {
+ final Class<T> defaultClass = (Class<T>) properties.getClassByNameOrNull(f.getDefaultClass());
+ Objects.requireNonNull(defaultClass, () -> "Failed to get the default class for " + f);
+ return properties.getClass(f.getKey(), defaultClass, ServerFactory.class);
+ }
+
+ /** Create a new {@link ServerFactory}. */
+ public static <T extends ServerFactory> T newServerFactory(
+ RpcType rpcType, RaftProperties properties) {
+ return RaftUtils.newInstance(
+ getClass(RaftServerConfigKeys.Factory.valueOf(rpcType), properties));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e1620e80/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
index 09d8493..c8257ac 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import org.apache.ratis.RpcType;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
@@ -61,6 +62,11 @@ class SimulatedServerRpc implements RaftServerRpc {
}
@Override
+ public RpcType getRpcType() {
+ return RpcType.SIMULATED;
+ }
+
+ @Override
public void start() {
serverHandler.startDaemon();
clientHandler.startDaemon();