You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2017/03/04 01:30:44 UTC
incubator-ratis git commit: RATIS-30. Provide a way to pass
parameters to rpc implementations.
Repository: incubator-ratis
Updated Branches:
refs/heads/master 5a94eac05 -> 46116d412
RATIS-30. Provide a way to pass parameters to rpc implementations.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/46116d41
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/46116d41
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/46116d41
Branch: refs/heads/master
Commit: 46116d412add72d761ebd6a9325362dc19227459
Parents: 5a94eac
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Fri Mar 3 17:30:18 2017 -0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Fri Mar 3 17:30:18 2017 -0800
----------------------------------------------------------------------
.../java/org/apache/ratis/RaftConfigKeys.java | 3 +-
.../java/org/apache/ratis/conf/Parameters.java | 66 +++++++++
.../main/java/org/apache/ratis/rpc/RpcType.java | 5 +-
.../org/apache/ratis/rpc/SupportedRpcType.java | 9 +-
.../java/org/apache/ratis/util/RaftUtils.java | 44 +++---
.../ratis/examples/RaftExamplesTestUtil.java | 2 +-
.../java/org/apache/ratis/grpc/GrpcFactory.java | 3 +
.../ratis/grpc/MiniRaftClusterWithGRpc.java | 51 +++----
.../ratis/hadooprpc/HadoopConfigKeys.java | 68 ++++++++++
.../apache/ratis/hadooprpc/HadoopFactory.java | 25 +++-
.../server/HadoopRpcServerConfigKeys.java | 56 --------
.../hadooprpc/server/HadoopRpcService.java | 5 +-
.../hadooprpc/MiniRaftClusterWithHadoopRpc.java | 47 +++----
.../org/apache/ratis/netty/NettyFactory.java | 3 +
.../ratis/netty/MiniRaftClusterWithNetty.java | 24 ++--
.../org/apache/ratis/server/RaftServer.java | 12 +-
.../ratis/server/impl/RaftServerImpl.java | 12 +-
.../ratis/server/impl/ServerImplUtils.java | 13 +-
.../java/org/apache/ratis/MiniRaftCluster.java | 133 +++++++++----------
.../impl/RaftReconfigurationBaseTest.java | 2 +-
.../MiniRaftClusterWithSimulatedRpc.java | 95 +++++--------
.../simulation/SimulatedRequestReply.java | 11 +-
.../ratis/server/simulation/SimulatedRpc.java | 32 +++--
23 files changed, 412 insertions(+), 309 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java b/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java
index adf393b..a8bc57d 100644
--- a/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java
+++ b/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java
@@ -43,7 +43,8 @@ public interface RaftConfigKeys {
}
// Try using it as a class name
- return RaftUtils.newInstance(t, properties, RpcType.class);
+ return RaftUtils.newInstance(
+ RaftUtils.getClass(t, properties, RpcType.class));
}
static void setType(BiConsumer<String, String> setRpcType, RpcType type) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-common/src/main/java/org/apache/ratis/conf/Parameters.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/conf/Parameters.java b/ratis-common/src/main/java/org/apache/ratis/conf/Parameters.java
new file mode 100644
index 0000000..f562d24
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/conf/Parameters.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.conf;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A generic parameter map.
+ * The difference between this class and {@link RaftProperties} is that
+ * {@link RaftProperties} is {@link String} based, i.e. properties are strings,
+ * while this class is {@link Object} based, i.e. parameters can be any objects.
+ *
+ * Null keys or null values are not supported.
+ *
+ * This class is thread safe.
+ */
+public class Parameters {
+ private final Map<String, Object> map = new ConcurrentHashMap<>();
+
+ /** Put the key-value pair to the map. */
+ public <T> T put(String key, T value, Class<T> valueClass) {
+ return valueClass.cast(map.put(
+ Objects.requireNonNull(key, "key is null"),
+ Objects.requireNonNull(value, () -> "value is null, key=" + key)));
+ }
+
+ /**
+ * @param <T> The value type.
+ * @return The value mapped to the given key;
+ * or return null if the key does not map to any value.
+ * @throws IllegalArgumentException if the mapped value is not an instance of the given class.
+ */
+ public <T> T get(String key, Class<T> valueClass) {
+ final Object value = map.get(Objects.requireNonNull(key, "key is null"));
+ return valueClass.cast(value);
+ }
+
+ /**
+ * The same as {@link #get(String, Class)} except that this method throws
+ * a {@link NullPointerException} if the key does not map to any value.
+ */
+ public <T> T getNonNull(String key, Class<T> valueClass) {
+ final T value = get(key, valueClass);
+ if (value != null) {
+ return value;
+ }
+ throw new NullPointerException("The key " + key + " does not map to any value.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-common/src/main/java/org/apache/ratis/rpc/RpcType.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/rpc/RpcType.java b/ratis-common/src/main/java/org/apache/ratis/rpc/RpcType.java
index a8085fe..701e979 100644
--- a/ratis-common/src/main/java/org/apache/ratis/rpc/RpcType.java
+++ b/ratis-common/src/main/java/org/apache/ratis/rpc/RpcType.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.rpc;
+import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
/** The type of RPC implementations. */
@@ -24,8 +25,8 @@ public interface RpcType {
/** @return the name of the rpc type. */
String name();
- /** @return a new factory created using the given properties. */
- RpcFactory newFactory(RaftProperties properties);
+ /** @return a new factory created using the given properties and parameters. */
+ RpcFactory newFactory(RaftProperties properties, Parameters parameters);
/** An interface to get {@link RpcType}. */
interface Get {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-common/src/main/java/org/apache/ratis/rpc/SupportedRpcType.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/rpc/SupportedRpcType.java b/ratis-common/src/main/java/org/apache/ratis/rpc/SupportedRpcType.java
index dcba59b..f1d8fac 100644
--- a/ratis-common/src/main/java/org/apache/ratis/rpc/SupportedRpcType.java
+++ b/ratis-common/src/main/java/org/apache/ratis/rpc/SupportedRpcType.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.rpc;
+import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.util.RaftUtils;
@@ -31,6 +32,8 @@ public enum SupportedRpcType implements RpcType {
return valueOf(s.toUpperCase());
}
+ private static final Class<?>[] ARG_CLASSES = {Parameters.class};
+
private final String factoryClassName;
SupportedRpcType(String factoryClassName) {
@@ -38,7 +41,9 @@ public enum SupportedRpcType implements RpcType {
}
@Override
- public RpcFactory newFactory(RaftProperties properties) {
- return RaftUtils.newInstance(factoryClassName, properties, RpcFactory.class);
+ public RpcFactory newFactory(RaftProperties properties, Parameters parameters) {
+ final Class<? extends RpcFactory> clazz = RaftUtils.getClass(
+ factoryClassName, properties, RpcFactory.class);
+ return RaftUtils.newInstance(clazz, ARG_CLASSES, parameters);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java
index 7f5703d..02b227c 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/RaftUtils.java
@@ -28,9 +28,7 @@ import java.io.*;
import java.lang.reflect.Constructor;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Objects;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
@@ -85,12 +83,13 @@ public abstract class RaftUtils {
public static final boolean PPC_64
= System.getProperties().getProperty("os.arch").contains("ppc64");
+ public static final Class<?>[] EMPTY_CLASSES = {};
/**
* Cache of constructors for each class. Pins the classes so they
* can't be garbage collected until ReflectionUtils can be collected.
*/
- private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
- new ConcurrentHashMap<>();
+ private static final Map<List<Class<?>>, Constructor<?>> CONSTRUCTOR_CACHE
+ = new ConcurrentHashMap<>();
public static InterruptedIOException toInterruptedIOException(
String message, InterruptedException e) {
@@ -120,31 +119,40 @@ public abstract class RaftUtils {
/**
* Create an object for the given class using its default constructor.
+ */
+ public static <T> T newInstance(Class<T> clazz) {
+ return newInstance(clazz, EMPTY_CLASSES);
+ }
+
+ /**
+ * Create an object for the given class using the specified constructor.
*
* @param clazz class of which an object is created
+ * @param argClasses argument classes of the constructor
+ * @param args actual arguments to be passed to the constructor
+ * @param <T> class type of clazz
* @return a new object
*/
- public static <T> T newInstance(Class<T> clazz) {
+ public static <T> T newInstance(Class<T> clazz, Class<?>[] argClasses, Object... args) {
Objects.requireNonNull(clazz, "clazz == null");
try {
+ final List<Class<?>> key = new ArrayList<>();
+ key.add(clazz);
+ key.addAll(Arrays.asList(argClasses));
+
@SuppressWarnings("unchecked")
- Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz);
- if (meth == null) {
- meth = clazz.getDeclaredConstructor();
- meth.setAccessible(true);
- CONSTRUCTOR_CACHE.put(clazz, meth);
+ Constructor<T> ctor = (Constructor<T>) CONSTRUCTOR_CACHE.get(key);
+ if (ctor == null) {
+ ctor = clazz.getDeclaredConstructor(argClasses);
+ ctor.setAccessible(true);
+ CONSTRUCTOR_CACHE.put(key, ctor);
}
- return meth.newInstance();
+ return ctor.newInstance(args);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
- public static <BASE, SUB extends BASE> SUB newInstance(
- String subClassName, RaftProperties properties, Class<BASE> base) {
- return newInstance(getClass(subClassName, properties, base));
- }
-
public static <BASE, SUB extends BASE> Class<SUB> getClass(
String subClassName, RaftProperties properties, Class<BASE> base) {
try {
@@ -309,8 +317,8 @@ public abstract class RaftUtils {
public static <INPUT, OUTPUT> Iterable<OUTPUT> as(
Iterable<INPUT> iteration, Function<INPUT, OUTPUT> converter) {
- final Iterator<INPUT> i = iteration.iterator();
return () -> new Iterator<OUTPUT>() {
+ final Iterator<INPUT> i = iteration.iterator();
@Override
public boolean hasNext() {
return i.hasNext();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java b/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java
index f54b766..7804353 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java
@@ -37,7 +37,7 @@ public class RaftExamplesTestUtil {
Collection<Object[]> clusters, MiniRaftCluster.Factory factory,
String[] ids, RaftProperties properties)
throws IOException {
- clusters.add(new Object[]{factory.newCluster(ids, properties, true)});
+ clusters.add(new Object[]{factory.newCluster(ids, properties)});
}
public static Collection<Object[]> getMiniRaftClusters(
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
index 3ae2602..60d6ef6 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
@@ -18,12 +18,15 @@
package org.apache.ratis.grpc;
import org.apache.ratis.client.ClientFactory;
+import org.apache.ratis.conf.Parameters;
import org.apache.ratis.grpc.client.GrpcClientRpc;
import org.apache.ratis.grpc.server.GRpcLogAppender;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.impl.*;
public class GrpcFactory implements ServerFactory, ClientFactory {
+ public GrpcFactory(Parameters parameters) {}
+
@Override
public SupportedRpcType getRpcType() {
return SupportedRpcType.GRPC;
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
index 4c7d74d..9676a48 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
@@ -21,61 +21,48 @@ import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
-import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
-import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.*;
+import org.apache.ratis.statemachine.StateMachine;
-import java.util.Collection;
+import java.io.IOException;
public class MiniRaftClusterWithGRpc extends MiniRaftCluster.RpcBase {
public static final Factory<MiniRaftClusterWithGRpc> FACTORY
= new Factory<MiniRaftClusterWithGRpc>() {
@Override
public MiniRaftClusterWithGRpc newCluster(
- String[] ids, RaftProperties prop, boolean formatted) {
+ String[] ids, RaftProperties prop) {
RaftConfigKeys.Rpc.setType(prop::set, SupportedRpcType.GRPC);
- return new MiniRaftClusterWithGRpc(ids, prop, formatted);
+ return new MiniRaftClusterWithGRpc(ids, prop);
}
};
public static final DelayLocalExecutionInjection sendServerRequestInjection =
new DelayLocalExecutionInjection(RaftGRpcService.GRPC_SEND_SERVER_REQUEST);
- private MiniRaftClusterWithGRpc(String[] ids, RaftProperties properties,
- boolean formatted) {
- super(ids, properties, formatted);
+ private MiniRaftClusterWithGRpc(String[] ids, RaftProperties properties) {
+ super(ids, properties, null);
}
@Override
- protected RaftServerImpl newRaftServer(RaftPeerId id, boolean format) {
- final RaftServerImpl s = super.newRaftServer(id, format);
- s.getProperties().setInt(
- RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY, getPort(s));
- return s;
+ protected RaftServerImpl newRaftServer(
+ RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf,
+ RaftProperties properties) throws IOException {
+ properties.setInt(RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY, getPort(id, conf));
+ return ServerImplUtils.newRaftServer(id, stateMachine, conf, properties, null);
}
@Override
- protected Collection<RaftPeer> addNewPeers(
- Collection<RaftServerImpl> newServers, boolean startService) {
- final Collection<RaftPeer> peers = toRaftPeers(newServers);
- for (RaftPeer p: peers) {
- final RaftServerImpl server = servers.get(p.getId());
- if (!startService) {
- BlockRequestHandlingInjection.getInstance().blockReplier(server.getId().toString());
- } else {
- server.start();
- }
+ protected void startServer(RaftServerImpl server, boolean startService) {
+ final String id = server.getId().toString();
+ if (startService) {
+ server.start();
+ BlockRequestHandlingInjection.getInstance().unblockReplier(id);
+ } else {
+ BlockRequestHandlingInjection.getInstance().blockReplier(id);
}
- return peers;
- }
-
- @Override
- public void startServer(RaftPeerId id) {
- super.startServer(id);
- BlockRequestHandlingInjection.getInstance().unblockReplier(id.toString());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java
new file mode 100644
index 0000000..cc09510
--- /dev/null
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.hadooprpc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ratis.conf.ConfUtils;
+import org.apache.ratis.conf.Parameters;
+
+import java.net.InetSocketAddress;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+
+/** Hadoop Rpc specific configuration properties. */
+public interface HadoopConfigKeys {
+ String PREFIX = "raft.hadooprpc";
+
+ String CONF_KEY = PREFIX + ".conf";
+
+ static Configuration getConf(
+ BiFunction<String, Class<Configuration>, Configuration> getConf) {
+ return getConf.apply(CONF_KEY, Configuration.class);
+ }
+
+ static void setConf(Parameters parameters, Configuration conf) {
+ parameters.put(CONF_KEY, conf, Configuration.class);
+ }
+
+ /** IPC server configurations */
+ interface Ipc {
+ String PREFIX = HadoopConfigKeys.PREFIX + ".ipc";
+
+ String ADDRESS_KEY = PREFIX + ".address";
+ int DEFAULT_PORT = 10718;
+ String ADDRESS_DEFAULT = "0.0.0.0:" + DEFAULT_PORT;
+
+ String HANDLERS_KEY = PREFIX + ".handlers";
+ int HANDLERS_DEFAULT = 10;
+
+ static int handlers(BiFunction<String, Integer, Integer> getInt) {
+ return ConfUtils.getInt(getInt,
+ HANDLERS_KEY, HANDLERS_DEFAULT, 1, null);
+ }
+
+ static InetSocketAddress address(BiFunction<String, String, String> getTrimmed) {
+ return ConfUtils.getInetSocketAddress(getTrimmed,
+ ADDRESS_KEY, ADDRESS_DEFAULT);
+ }
+
+ static void setAddress(BiConsumer<String, String> setString, String address) {
+ ConfUtils.set(setString, ADDRESS_KEY, address);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
index 7b9e20f..ad866c8 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
@@ -19,6 +19,7 @@ package org.apache.ratis.hadooprpc;
import org.apache.hadoop.conf.Configuration;
import org.apache.ratis.client.ClientFactory;
+import org.apache.ratis.conf.Parameters;
import org.apache.ratis.hadooprpc.client.HadoopClientRpc;
import org.apache.ratis.hadooprpc.server.HadoopRpcService;
import org.apache.ratis.rpc.SupportedRpcType;
@@ -26,10 +27,24 @@ import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.ServerFactory;
public class HadoopFactory extends ServerFactory.BaseFactory implements ClientFactory {
- private Configuration conf;
+ public static Parameters newRaftParameters(Configuration conf) {
+ final Parameters p = new Parameters();
+ HadoopConfigKeys.setConf(p, conf);
+ return p;
+ }
+
+ private final Configuration conf;
+
+ public HadoopFactory(Parameters parameters) {
+ this(HadoopConfigKeys.getConf(parameters::get));
+ }
+
+ public HadoopFactory(Configuration conf) {
+ this.conf = conf != null? conf: new Configuration();
+ }
- public void setConf(Configuration conf) {
- this.conf = conf;
+ public Configuration getConf() {
+ return conf;
}
@Override
@@ -41,12 +56,12 @@ public class HadoopFactory extends ServerFactory.BaseFactory implements ClientFa
public HadoopRpcService newRaftServerRpc(RaftServerImpl server) {
return HadoopRpcService.newBuilder()
.setServer(server)
- .setConf(conf)
+ .setConf(getConf())
.build();
}
@Override
public HadoopClientRpc newRaftClientRpc() {
- return new HadoopClientRpc(conf);
+ return new HadoopClientRpc(getConf());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcServerConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcServerConfigKeys.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcServerConfigKeys.java
deleted file mode 100644
index bfdf05b..0000000
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcServerConfigKeys.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.hadooprpc.server;
-
-import org.apache.ratis.conf.ConfUtils;
-
-import java.net.InetSocketAddress;
-import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
-
-public interface HadoopRpcServerConfigKeys {
- String PREFIX = "raft.hadooprpc";
-
- /** IPC server configurations */
- abstract class Ipc {
- public static final String PREFIX = HadoopRpcServerConfigKeys.PREFIX + ".ipc";
-
- public static final String ADDRESS_KEY = PREFIX + ".address";
- public static final int DEFAULT_PORT = 10718;
- public static final String ADDRESS_DEFAULT = "0.0.0.0:" + DEFAULT_PORT;
-
- public static final String HANDLERS_KEY = PREFIX + ".handlers";
- public static final int HANDLERS_DEFAULT = 10;
-
- public static int handlers(BiFunction<String, Integer, Integer> getInt) {
- return ConfUtils.getInt(getInt,
- HANDLERS_KEY, HANDLERS_DEFAULT, 1, null);
- }
-
- public static InetSocketAddress address(BiFunction<String, String, String> getTrimmed) {
- return ConfUtils.getInetSocketAddress(getTrimmed,
- ADDRESS_KEY, ADDRESS_DEFAULT);
- }
-
- public static void setAddress(
- BiConsumer<String, String> setString,
- String address) {
- ConfUtils.set(setString, ADDRESS_KEY, address);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
index 87fd2e2..00d69aa 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
@@ -20,6 +20,7 @@ package org.apache.ratis.hadooprpc.server;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngineShaded;
import org.apache.hadoop.ipc.RPC;
+import org.apache.ratis.hadooprpc.HadoopConfigKeys;
import org.apache.ratis.hadooprpc.Proxy;
import org.apache.ratis.hadooprpc.client.RaftClientProtocolPB;
import org.apache.ratis.hadooprpc.client.RaftClientProtocolServerSideTranslatorPB;
@@ -118,8 +119,8 @@ public class HadoopRpcService implements RaftServerRpc {
private static RPC.Server newRpcServer(
RaftServerProtocol serverProtocol, final Configuration conf)
throws IOException {
- final int handlerCount = HadoopRpcServerConfigKeys.Ipc.handlers(conf::getInt);
- final InetSocketAddress address = HadoopRpcServerConfigKeys.Ipc.address(conf::getTrimmed);
+ final int handlerCount = HadoopConfigKeys.Ipc.handlers(conf::getInt);
+ final InetSocketAddress address = HadoopConfigKeys.Ipc.address(conf::getTrimmed);
final BlockingService service
= RaftServerProtocolService.newReflectiveBlockingService(
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
index 76acafd..b3a607b 100644
--- a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
+++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
@@ -22,36 +22,39 @@ import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.hadooprpc.server.HadoopRpcServerConfigKeys;
import org.apache.ratis.hadooprpc.server.HadoopRpcService;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
+import org.apache.ratis.server.impl.RaftConfiguration;
import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.ServerImplUtils;
+import org.apache.ratis.statemachine.StateMachine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
public class MiniRaftClusterWithHadoopRpc extends MiniRaftCluster.RpcBase {
static final Logger LOG = LoggerFactory.getLogger(MiniRaftClusterWithHadoopRpc.class);
public static class Factory extends MiniRaftCluster.Factory<MiniRaftClusterWithHadoopRpc> {
@Override
- public MiniRaftClusterWithHadoopRpc newCluster(
- String[] ids, RaftProperties prop, boolean formatted) {
+ public MiniRaftClusterWithHadoopRpc newCluster(String[] ids, RaftProperties prop) {
final Configuration conf = new Configuration();
- return newCluster(ids, prop, conf, formatted);
+ return newCluster(ids, prop, conf);
}
public MiniRaftClusterWithHadoopRpc newCluster(
int numServers, RaftProperties properties, Configuration conf) {
- return newCluster(generateIds(numServers, 0), properties, conf, true);
+ return newCluster(generateIds(numServers, 0), properties, conf);
}
public MiniRaftClusterWithHadoopRpc newCluster(
- String[] ids, RaftProperties prop, Configuration conf, boolean formatted) {
+ String[] ids, RaftProperties prop, Configuration conf) {
RaftConfigKeys.Rpc.setType(prop::set, SupportedRpcType.HADOOP);
- HadoopRpcServerConfigKeys.Ipc.setAddress(conf::set, "0.0.0.0:0");
- return new MiniRaftClusterWithHadoopRpc(ids, prop, conf, formatted);
+ HadoopConfigKeys.Ipc.setAddress(conf::set, "0.0.0.0:0");
+ return new MiniRaftClusterWithHadoopRpc(ids, prop, conf);
}
}
@@ -63,27 +66,21 @@ public class MiniRaftClusterWithHadoopRpc extends MiniRaftCluster.RpcBase {
private final Configuration hadoopConf;
private MiniRaftClusterWithHadoopRpc(String[] ids, RaftProperties properties,
- Configuration hadoopConf, boolean formatted) {
- super(ids, properties, formatted);
+ Configuration hadoopConf) {
+ super(ids, properties, HadoopFactory.newRaftParameters(hadoopConf));
this.hadoopConf = hadoopConf;
- getServers().stream().forEach(s -> setConf(s));
- ((HadoopFactory)clientFactory).setConf(hadoopConf);
- }
-
- private void setConf(RaftServerImpl server) {
- final Configuration conf = new Configuration(hadoopConf);
- final String address = "0.0.0.0:" + getPort(server);
- HadoopRpcServerConfigKeys.Ipc.setAddress(conf::set, address);
- ((HadoopFactory)server.getFactory()).setConf(conf);
}
@Override
- protected RaftServerImpl newRaftServer(RaftPeerId id, boolean format) {
- final RaftServerImpl s = super.newRaftServer(id, format);
- if (hadoopConf != null) {
- setConf(s);
- }
- return s;
+ protected RaftServerImpl newRaftServer(
+ RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf,
+ RaftProperties properties) throws IOException {
+ final Configuration hconf = new Configuration(hadoopConf);
+ final String address = "0.0.0.0:" + getPort(id, conf);
+ HadoopConfigKeys.Ipc.setAddress(hconf::set, address);
+
+ return ServerImplUtils.newRaftServer(id, stateMachine, conf, properties,
+ HadoopFactory.newRaftParameters(hconf));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
index 525b991..6dcfd15 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
@@ -18,6 +18,7 @@
package org.apache.ratis.netty;
import org.apache.ratis.client.ClientFactory;
+import org.apache.ratis.conf.Parameters;
import org.apache.ratis.netty.client.NettyClientRpc;
import org.apache.ratis.netty.server.NettyRpcService;
import org.apache.ratis.rpc.SupportedRpcType;
@@ -25,6 +26,8 @@ import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.ServerFactory;
public class NettyFactory extends ServerFactory.BaseFactory implements ClientFactory {
+ public NettyFactory(Parameters parameters) {}
+
@Override
public SupportedRpcType getRpcType() {
return SupportedRpcType.NETTY;
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
index 29857dd..697aef6 100644
--- a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
+++ b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
@@ -25,32 +25,36 @@ import org.apache.ratis.netty.server.NettyRpcService;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
+import org.apache.ratis.server.impl.RaftConfiguration;
import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.ServerImplUtils;
+import org.apache.ratis.statemachine.StateMachine;
+
+import java.io.IOException;
public class MiniRaftClusterWithNetty extends MiniRaftCluster.RpcBase {
public static final Factory<MiniRaftClusterWithNetty> FACTORY
= new Factory<MiniRaftClusterWithNetty>() {
@Override
- public MiniRaftClusterWithNetty newCluster(
- String[] ids, RaftProperties prop, boolean formatted) {
+ public MiniRaftClusterWithNetty newCluster(String[] ids, RaftProperties prop) {
RaftConfigKeys.Rpc.setType(prop::set, SupportedRpcType.NETTY);
- return new MiniRaftClusterWithNetty(ids, prop, formatted);
+ return new MiniRaftClusterWithNetty(ids, prop);
}
};
public static final DelayLocalExecutionInjection sendServerRequest
= new DelayLocalExecutionInjection(NettyRpcService.SEND_SERVER_REQUEST);
- private MiniRaftClusterWithNetty(
- String[] ids, RaftProperties properties, boolean formatted) {
- super(ids, properties, formatted);
+ private MiniRaftClusterWithNetty(String[] ids, RaftProperties properties) {
+ super(ids, properties, null);
}
@Override
- protected RaftServerImpl newRaftServer(RaftPeerId id, boolean format) {
- final RaftServerImpl s = super.newRaftServer(id, format);
- NettyConfigKeys.Server.setPort(s.getProperties()::setInt, getPort(s));
- return s;
+ protected RaftServerImpl newRaftServer(
+ RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf,
+ RaftProperties properties) throws IOException {
+ NettyConfigKeys.Server.setPort(properties::setInt, getPort(id, conf));
+ return ServerImplUtils.newRaftServer(id, stateMachine, conf, properties, null);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index 7417e0d..dbd32b7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -17,13 +17,13 @@
*/
package org.apache.ratis.server;
+import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftClientAsynchronousProtocol;
import org.apache.ratis.protocol.RaftClientProtocol;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.RpcType;
-import org.apache.ratis.server.impl.RaftConfiguration;
import org.apache.ratis.server.impl.ServerFactory;
import org.apache.ratis.server.impl.ServerImplUtils;
import org.apache.ratis.server.protocol.RaftServerProtocol;
@@ -65,6 +65,7 @@ public interface RaftServer extends Closeable, RpcType.Get, RaftServerProtocol,
private StateMachine stateMachine;
private Iterable<RaftPeer> peers;
private RaftProperties properties;
+ private Parameters parameters;
/** @return a {@link RaftServer} object. */
public RaftServer build() throws IOException {
@@ -72,7 +73,8 @@ public interface RaftServer extends Closeable, RpcType.Get, RaftServerProtocol,
Objects.requireNonNull(serverId, "The 'serverId' field is not initialized."),
Objects.requireNonNull(stateMachine, "The 'stateMachine' is not initialized."),
Objects.requireNonNull(peers, "The 'peers' field is not initialized."),
- Objects.requireNonNull(properties, "The 'properties' field is not initialized."));
+ Objects.requireNonNull(properties, "The 'properties' field is not initialized."),
+ parameters);
}
/** Set the server ID. */
@@ -98,5 +100,11 @@ public interface RaftServer extends Closeable, RpcType.Get, RaftServerProtocol,
this.properties = properties;
return this;
}
+
+ /** Set {@link Parameters}. */
+ public Builder setParameters(Parameters parameters) {
+ this.parameters = parameters;
+ return this;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index d729914..b9f063e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server.impl;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.conf.Parameters;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.*;
@@ -47,7 +48,6 @@ import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.function.Supplier;
import static org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*;
import static org.apache.ratis.util.LifeCycle.State.*;
@@ -84,12 +84,12 @@ public class RaftServerImpl implements RaftServer {
/** used when the peer is leader */
private volatile LeaderState leaderState;
- private final Supplier<RaftServerRpc> serverRpc;
+ private final RaftServerRpc serverRpc;
private final ServerFactory factory;
RaftServerImpl(RaftPeerId id, StateMachine stateMachine,
- RaftConfiguration raftConf, RaftProperties properties)
+ RaftConfiguration raftConf, RaftProperties properties, Parameters parameters)
throws IOException {
this.lifeCycle = new LifeCycle(id);
minTimeoutMs = properties.getInt(
@@ -105,8 +105,8 @@ public class RaftServerImpl implements RaftServer {
this.state = new ServerState(id, raftConf, properties, this, stateMachine);
final RpcType rpcType = RaftConfigKeys.Rpc.type(properties);
- this.factory = ServerFactory.cast(rpcType.newFactory(properties));
- this.serverRpc = RaftUtils.memoize(() -> initRaftServerRpc());
+ this.factory = ServerFactory.cast(rpcType.newFactory(properties, parameters));
+ this.serverRpc = initRaftServerRpc();
}
@Override
@@ -147,7 +147,7 @@ public class RaftServerImpl implements RaftServer {
}
public RaftServerRpc getServerRpc() {
- return serverRpc.get();
+ return serverRpc;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index 5248906..30d6e29 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -18,6 +18,7 @@
package org.apache.ratis.server.impl;
import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
@@ -30,17 +31,17 @@ import java.io.IOException;
/** Server utilities for internal use. */
public class ServerImplUtils {
public static RaftServer newRaftServer(
- RaftPeerId id, StateMachine stateMachine,
- Iterable<RaftPeer> peers, RaftProperties properties) throws IOException {
+ RaftPeerId id, StateMachine stateMachine, Iterable<RaftPeer> peers,
+ RaftProperties properties, Parameters parameters) throws IOException {
return newRaftServer(id, stateMachine,
RaftConfiguration.newBuilder().setConf(peers).build(),
- properties);
+ properties, parameters);
}
public static RaftServerImpl newRaftServer(
- RaftPeerId id, StateMachine stateMachine,
- RaftConfiguration conf, RaftProperties properties) throws IOException {
- return new RaftServerImpl(id, stateMachine, conf, properties);
+ RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf,
+ RaftProperties properties, Parameters parameters) throws IOException {
+ return new RaftServerImpl(id, stateMachine, conf, properties, parameters);
}
public static TermIndex newTermIndex(long term, long index) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 9c566a9..6d14c24 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -20,6 +20,7 @@ package org.apache.ratis;
import com.google.common.base.Preconditions;
import org.apache.ratis.client.ClientFactory;
import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
@@ -42,7 +43,9 @@ import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MIN_MS_DEFAULT;
@@ -59,22 +62,16 @@ public abstract class MiniRaftCluster {
public static abstract class Factory<CLUSTER extends MiniRaftCluster> {
public abstract CLUSTER newCluster(
- String[] ids, RaftProperties prop, boolean formatted);
+ String[] ids, RaftProperties prop);
public CLUSTER newCluster(int numServer, RaftProperties prop) {
- return newCluster(generateIds(numServer, 0), prop, true);
+ return newCluster(generateIds(numServer, 0), prop);
}
}
public static abstract class RpcBase extends MiniRaftCluster {
- public RpcBase(String[] ids, RaftProperties properties, boolean formatted) {
- super(ids, properties, formatted);
- }
-
- @Override
- public void restartServer(String id, boolean format) throws IOException {
- super.restartServer(id, format);
- getServer(id).start();
+ public RpcBase(String[] ids, RaftProperties properties, Parameters parameters) {
+ super(ids, properties, parameters);
}
@Override
@@ -82,12 +79,6 @@ public abstract class MiniRaftCluster {
RaftTestUtil.setBlockRequestsFrom(src, block);
}
- public static int getPort(RaftServerImpl server) {
- final int port = getPort(server.getId(), server.getState().getRaftConf());
- LOG.info(server.getId() + "(" + server.getRpcType() + "), port=" + port);
- return port;
- }
-
public static int getPort(RaftPeerId id, RaftConfiguration conf) {
final RaftPeer peer = conf.getPeer(id);
final String address = peer != null? peer.getAddress(): null;
@@ -144,27 +135,46 @@ public abstract class MiniRaftCluster {
protected final ClientFactory clientFactory;
protected RaftConfiguration conf;
protected final RaftProperties properties;
+ protected final Parameters parameters;
private final String testBaseDir;
- protected final Map<RaftPeerId, RaftServerImpl> servers =
- Collections.synchronizedMap(new LinkedHashMap<>());
+ protected final Map<RaftPeerId, RaftServerImpl> servers = new ConcurrentHashMap<>();
- public MiniRaftCluster(String[] ids, RaftProperties properties,
- boolean formatted) {
+ protected MiniRaftCluster(String[] ids, RaftProperties properties, Parameters parameters) {
this.conf = initConfiguration(ids);
this.properties = new RaftProperties(properties);
+ this.parameters = parameters;
final RpcType rpcType = RaftConfigKeys.Rpc.type(properties);
- this.clientFactory = ClientFactory.cast(rpcType.newFactory(properties));
+ this.clientFactory = ClientFactory.cast(
+ rpcType.newFactory(properties, parameters));
this.testBaseDir = getBaseDirectory();
- conf.getPeers().forEach(
- p -> servers.put(p.getId(), newRaftServer(p.getId(), formatted)));
-
ExitUtils.disableSystemExit();
}
+ public MiniRaftCluster initServers() {
+ if (servers.isEmpty()) {
+ putNewServers(RaftUtils.as(conf.getPeers(), RaftPeer::getId), true);
+ }
+ return this;
+ }
+
+ private RaftServerImpl putNewServer(RaftPeerId id, boolean format) {
+ final RaftServerImpl s = newRaftServer(id, format);
+ Preconditions.checkState(servers.put(id, s) == null);
+ return s;
+ }
+
+ private Collection<RaftServerImpl> putNewServers(
+ Iterable<RaftPeerId> peers, boolean format) {
+ return StreamSupport.stream(peers.spliterator(), false)
+ .map(id -> putNewServer(id, format))
+ .collect(Collectors.toList());
+ }
+
public void start() {
LOG.info("Starting " + getClass().getSimpleName());
+ initServers();
servers.values().forEach(RaftServerImpl::start);
}
@@ -175,24 +185,19 @@ public abstract class MiniRaftCluster {
final RaftPeerId newId = new RaftPeerId(id);
killServer(newId);
servers.remove(newId);
- servers.put(newId, newRaftServer(newId, format));
+
+ startServer(putNewServer(newId, format), true);
}
- public final void restart(boolean format) throws IOException {
+ public void restart(boolean format) throws IOException {
servers.values().stream().filter(RaftServerImpl::isAlive)
.forEach(RaftServerImpl::close);
List<RaftPeerId> idList = new ArrayList<>(servers.keySet());
- for (RaftPeerId id : idList) {
- servers.remove(id);
- servers.put(id, newRaftServer(id, format));
- }
-
- initRpc();
+ servers.clear();
+ putNewServers(idList, format);
start();
}
- protected void initRpc() {}
-
public int getMaxTimeout() {
return properties.getInt(
RaftServerConfigKeys.RAFT_SERVER_RPC_TIMEOUT_MAX_MS_KEY,
@@ -203,7 +208,7 @@ public abstract class MiniRaftCluster {
return conf;
}
- protected RaftServerImpl newRaftServer(RaftPeerId id, boolean format) {
+ private RaftServerImpl newRaftServer(RaftPeerId id, boolean format) {
try {
final String dirStr = testBaseDir + id;
if (format) {
@@ -212,12 +217,16 @@ public abstract class MiniRaftCluster {
final RaftProperties prop = new RaftProperties(properties);
prop.set(RaftServerConfigKeys.RAFT_SERVER_STORAGE_DIR_KEY, dirStr);
final StateMachine stateMachine = getStateMachine4Test(properties);
- return ServerImplUtils.newRaftServer(id, stateMachine, conf, prop);
+ return newRaftServer(id, stateMachine, conf, prop);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
+ protected abstract RaftServerImpl newRaftServer(
+ RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf,
+ RaftProperties properties) throws IOException;
+
static StateMachine getStateMachine4Test(RaftProperties properties) {
final Class<? extends StateMachine> smClass = properties.getClass(
STATEMACHINE_CLASS_KEY,
@@ -229,17 +238,12 @@ public abstract class MiniRaftCluster {
public static Collection<RaftPeer> toRaftPeers(
Collection<RaftServerImpl> servers) {
return servers.stream()
- .map(s -> new RaftPeer(s.getId(), s.getServerRpc().getInetSocketAddress()))
+ .map(MiniRaftCluster::toRaftPeer)
.collect(Collectors.toList());
}
- protected Collection<RaftPeer> addNewPeers(
- Collection<RaftServerImpl> newServers, boolean startService) {
- final Collection<RaftPeer> peers = toRaftPeers(newServers);
- if (startService) {
- newServers.forEach(RaftServerImpl::start);
- }
- return peers;
+ public static RaftPeer toRaftPeer(RaftServerImpl s) {
+ return new RaftPeer(s.getId(), s.getServerRpc().getInetSocketAddress());
}
public PeerChanges addNewPeers(int number, boolean startNewPeer)
@@ -252,18 +256,11 @@ public abstract class MiniRaftCluster {
LOG.info("Add new peers {}", Arrays.asList(ids));
// create and add new RaftServers
- final List<RaftServerImpl> newServers = new ArrayList<>(ids.length);
- for (String id : ids) {
- Preconditions.checkArgument(!servers.containsKey(id));
-
- final RaftPeerId peerId = new RaftPeerId(id);
- final RaftServerImpl newServer = newRaftServer(peerId, true);
- servers.put(peerId, newServer);
- newServers.add(newServer);
- }
-
- final Collection<RaftPeer> newPeers = addNewPeers(newServers, startNewPeer);
+ final Collection<RaftServerImpl> newServers = putNewServers(
+ RaftUtils.as(Arrays.asList(ids), RaftPeerId::new), true);
+ newServers.forEach(s -> startServer(s, startNewPeer));
+ final Collection<RaftPeer> newPeers = toRaftPeers(newServers);
final RaftPeer[] np = newPeers.toArray(new RaftPeer[newPeers.size()]);
newPeers.addAll(conf.getPeers());
conf = RaftConfiguration.newBuilder().setConf(newPeers).setLogEntryIndex(0).build();
@@ -271,14 +268,14 @@ public abstract class MiniRaftCluster {
return new PeerChanges(p, np, new RaftPeer[0]);
}
- public void startServer(RaftPeerId id) {
- RaftServerImpl server = servers.get(id);
- assert server != null;
- server.start();
+ protected void startServer(RaftServerImpl server, boolean startService) {
+ if (startService) {
+ server.start();
+ }
}
- private RaftPeer getPeer(RaftServerImpl s) {
- return new RaftPeer(s.getId(), s.getServerRpc().getInetSocketAddress());
+ public void startServer(RaftPeerId id) {
+ startServer(getServer(id), true);
}
/**
@@ -289,7 +286,7 @@ public abstract class MiniRaftCluster {
Collection<RaftPeer> peers = new ArrayList<>(conf.getPeers());
List<RaftPeer> removedPeers = new ArrayList<>(number);
if (removeLeader) {
- final RaftPeer leader = getPeer(getLeader());
+ final RaftPeer leader = toRaftPeer(getLeader());
assert !excluded.contains(leader);
peers.remove(leader);
removedPeers.add(leader);
@@ -297,7 +294,7 @@ public abstract class MiniRaftCluster {
List<RaftServerImpl> followers = getFollowers();
for (int i = 0, removed = 0; i < followers.size() &&
removed < (removeLeader ? number - 1 : number); i++) {
- RaftPeer toRemove = getPeer(followers.get(i));
+ RaftPeer toRemove = toRaftPeer(followers.get(i));
if (!excluded.contains(toRemove)) {
peers.remove(toRemove);
removedPeers.add(toRemove);
@@ -381,13 +378,15 @@ public abstract class MiniRaftCluster {
}
public RaftServerImpl getServer(String id) {
- return servers.get(new RaftPeerId(id));
+ return getServer(new RaftPeerId(id));
+ }
+
+ public RaftServerImpl getServer(RaftPeerId id) {
+ return servers.get(id);
}
public Collection<RaftPeer> getPeers() {
- return getServers().stream().map(s ->
- new RaftPeer(s.getId(), s.getServerRpc().getInetSocketAddress()))
- .collect(Collectors.toList());
+ return toRaftPeers(getServers());
}
public RaftClient createClient(RaftPeerId leaderId) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index 3017634..e3db854 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -583,7 +583,7 @@ public abstract class RaftReconfigurationBaseTest {
@Test
public void testLeaderNotReadyException() throws Exception {
LOG.info("Start testLeaderNotReadyException");
- final MiniRaftCluster cluster = getCluster(1);
+ final MiniRaftCluster cluster = getCluster(1).initServers();
final RaftPeerId leaderId = cluster.getPeers().iterator().next().getId();
try {
// delay 1s for each logSync call
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
index e33d64f..3ed2596 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
@@ -19,16 +19,17 @@ package org.apache.ratis.server.simulation;
import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.conf.ConfUtils;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.impl.RaftConfiguration;
import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.ServerImplUtils;
+import org.apache.ratis.statemachine.StateMachine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
import java.util.concurrent.ThreadLocalRandom;
public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster {
@@ -38,76 +39,52 @@ public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster {
= new Factory<MiniRaftClusterWithSimulatedRpc>() {
@Override
public MiniRaftClusterWithSimulatedRpc newCluster(
- String[] ids, RaftProperties prop, boolean formatted) {
+ String[] ids, RaftProperties prop) {
RaftConfigKeys.Rpc.setType(prop::set, SimulatedRpc.INSTANCE);
if (ThreadLocalRandom.current().nextBoolean()) {
// turn off simulate latency half of the times.
prop.setInt(SimulatedRequestReply.SIMULATE_LATENCY_KEY, 0);
}
- return new MiniRaftClusterWithSimulatedRpc(ids, prop, formatted);
- }
- };
-
- private SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply;
- private SimulatedClientRpc client2serverRequestReply;
-
- private MiniRaftClusterWithSimulatedRpc(String[] ids,
- RaftProperties properties, boolean formatted) {
- super(ids, properties, formatted);
- initRpc();
- }
-
- @Override
- protected void initRpc() {
- final int simulateLatencyMs = properties.getInt(
- SimulatedRequestReply.SIMULATE_LATENCY_KEY,
- SimulatedRequestReply.SIMULATE_LATENCY_DEFAULT);
- LOG.info(SimulatedRequestReply.SIMULATE_LATENCY_KEY + " = "
- + simulateLatencyMs);
- serverRequestReply = new SimulatedRequestReply<>(simulateLatencyMs);
- client2serverRequestReply = new SimulatedClientRpc(simulateLatencyMs);
- getServers().stream().forEach(s -> initRpc(s));
- addPeersToRpc(toRaftPeers(getServers()));
- ((SimulatedRpc.Factory)clientFactory).initRpc(
- serverRequestReply, client2serverRequestReply);
- }
-
- private void initRpc(RaftServerImpl s) {
- if (serverRequestReply != null) {
- ((SimulatedRpc.Factory)s.getFactory()).initRpc(
+ final int simulateLatencyMs = ConfUtils.getInt(prop::getInt,
+ SimulatedRequestReply.SIMULATE_LATENCY_KEY,
+ SimulatedRequestReply.SIMULATE_LATENCY_DEFAULT, 0, null);
+ final SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply
+ = new SimulatedRequestReply<>(simulateLatencyMs);
+ final SimulatedClientRpc client2serverRequestReply
+ = new SimulatedClientRpc(simulateLatencyMs);
+ return new MiniRaftClusterWithSimulatedRpc(ids, prop,
serverRequestReply, client2serverRequestReply);
}
- }
-
- private void addPeersToRpc(Collection<RaftPeer> peers) {
- serverRequestReply.addPeers(peers);
- client2serverRequestReply.addPeers(peers);
- }
+ };
- @Override
- protected RaftServerImpl newRaftServer(RaftPeerId id, boolean format) {
- final RaftServerImpl s = super.newRaftServer(id, format);
- initRpc(s);
- return s;
+ private final SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply;
+ private final SimulatedClientRpc client2serverRequestReply;
+
+ private MiniRaftClusterWithSimulatedRpc(
+ String[] ids, RaftProperties properties,
+ SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply,
+ SimulatedClientRpc client2serverRequestReply) {
+ super(ids, properties,
+ SimulatedRpc.Factory.newRaftParameters(serverRequestReply, client2serverRequestReply));
+ this.serverRequestReply = serverRequestReply;
+ this.client2serverRequestReply = client2serverRequestReply;
}
@Override
- public void restartServer(String id, boolean format) throws IOException {
- super.restartServer(id, format);
- RaftServerImpl s = getServer(id);
- addPeersToRpc(Collections.singletonList(conf.getPeer(new RaftPeerId(id))));
- s.start();
+ public void restart(boolean format) throws IOException {
+ serverRequestReply.clear();
+ client2serverRequestReply.clear();
+ super.restart(format);
}
@Override
- public Collection<RaftPeer> addNewPeers(
- Collection<RaftServerImpl> newServers, boolean startService) {
- final Collection<RaftPeer> newPeers = toRaftPeers(newServers);
- addPeersToRpc(newPeers);
- if (startService) {
- newServers.forEach(RaftServerImpl::start);
- }
- return newPeers;
+ protected RaftServerImpl newRaftServer(
+ RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf,
+ RaftProperties properties) throws IOException {
+ serverRequestReply.addPeer(id);
+ client2serverRequestReply.addPeer(id);
+ return ServerImplUtils.newRaftServer(id, stateMachine, conf, properties,
+ parameters);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java
index 95d3efa..64aaeac 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server.simulation;
import com.google.common.base.Preconditions;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.RaftRpcMessage;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.util.RaftUtils;
@@ -175,10 +176,12 @@ class SimulatedRequestReply<REQUEST extends RaftRpcMessage,
queues.remove(id);
}
- public void addPeers(Collection<RaftPeer> newPeers) {
- for (RaftPeer peer : newPeers) {
- queues.put(peer.getId().toString(), new EventQueue<>());
- }
+ public void clear() {
+ queues.clear();
+ }
+
+ public void addPeer(RaftPeerId newPeer) {
+ queues.put(newPeer.toString(), new EventQueue<>());
}
private void simulateLatency() throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/46116d41/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
index 67193b4..69707aa 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
@@ -18,8 +18,8 @@
package org.apache.ratis.server.simulation;
import org.apache.ratis.client.ClientFactory;
+import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.rpc.RpcFactory;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.ServerFactory;
@@ -35,19 +35,31 @@ class SimulatedRpc implements RpcType {
}
@Override
- public RpcFactory newFactory(RaftProperties properties) {
- return new Factory();
+ public Factory newFactory(RaftProperties properties, Parameters parameters) {
+ return new Factory(parameters);
}
static class Factory extends ServerFactory.BaseFactory implements ClientFactory {
- private SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply;
- private SimulatedClientRpc client2serverRequestReply;
+ static String SERVER_REQUEST_REPLY_KEY = "raft.simulated.serverRequestReply";
+ static String CLIENT_TO_SERVER_REQUEST_REPLY_KEY = "raft.simulated.client2serverRequestReply";
- public void initRpc(
- SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply,
- SimulatedClientRpc client2serverRequestReply) {
- this.serverRequestReply = Objects.requireNonNull(serverRequestReply);
- this.client2serverRequestReply = Objects.requireNonNull(client2serverRequestReply);
+ static Parameters newRaftParameters(
+ SimulatedRequestReply<RaftServerRequest, RaftServerReply> server,
+ SimulatedClientRpc client2server) {
+ final Parameters p = new Parameters();
+ p.put(SERVER_REQUEST_REPLY_KEY, server, SimulatedRequestReply.class);
+ p.put(CLIENT_TO_SERVER_REQUEST_REPLY_KEY, client2server, SimulatedClientRpc.class);
+ return p;
+ }
+
+ private final SimulatedRequestReply<RaftServerRequest, RaftServerReply> serverRequestReply;
+ private final SimulatedClientRpc client2serverRequestReply;
+
+ Factory(Parameters parameters) {
+ serverRequestReply = parameters.getNonNull(
+ SERVER_REQUEST_REPLY_KEY, SimulatedRequestReply.class);
+ client2serverRequestReply = parameters.getNonNull(
+ CLIENT_TO_SERVER_REQUEST_REPLY_KEY, SimulatedClientRpc.class);
}
@Override