You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/03/08 23:18:21 UTC
incubator-ratis git commit: RATIS-36. Use ConfUtils for other
ConfigKeys. Contributed by Tsz Wo Nicholas Sze.
Repository: incubator-ratis
Updated Branches:
refs/heads/master b5a07be08 -> 4e5d0d795
RATIS-36. Use ConfUtils for other ConfigKeys. Contributed by Tsz Wo Nicholas Sze.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/4e5d0d79
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/4e5d0d79
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/4e5d0d79
Branch: refs/heads/master
Commit: 4e5d0d7952484b7ca880b7ce4b7e9f557ba08654
Parents: b5a07be
Author: Jing Zhao <ji...@apache.org>
Authored: Wed Mar 8 15:18:11 2017 -0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Wed Mar 8 15:18:11 2017 -0800
----------------------------------------------------------------------
ratis-client/pom.xml | 1 -
.../org/apache/ratis/client/RaftClient.java | 7 +-
.../ratis/client/RaftClientConfigKeys.java | 26 ++-
.../ratis/client/impl/ClientImplUtils.java | 3 +-
.../ratis/client/impl/RaftClientImpl.java | 7 +-
.../java/org/apache/ratis/RaftConfigKeys.java | 15 +-
.../java/org/apache/ratis/conf/ConfUtils.java | 108 ++++++++++---
.../org/apache/ratis/conf/RaftProperties.java | 99 +++---------
.../apache/ratis/util/CheckedBiConsumer.java | 30 ++++
.../java/org/apache/ratis/util/SizeInBytes.java | 82 ++++++++++
.../org/apache/ratis/util/TimeDuration.java | 141 ++++++++++++++++
.../ratis/util/TraditionalBinaryPrefix.java | 162 +++++++++++++++++++
.../org/apache/ratis/util/TestTimeDuration.java | 84 ++++++++++
.../ratis/util/TestTraditionalBinaryPrefix.java | 145 +++++++++++++++++
ratis-grpc/pom.xml | 2 -
.../org/apache/ratis/grpc/GrpcConfigKeys.java | 96 +++++++++++
.../org/apache/ratis/grpc/RaftGRpcService.java | 8 +-
.../apache/ratis/grpc/RaftGrpcConfigKeys.java | 47 ------
.../ratis/grpc/client/AppendStreamer.java | 34 ++--
.../ratis/grpc/client/RaftOutputStream.java | 17 +-
.../ratis/grpc/server/GRpcLogAppender.java | 6 +-
.../ratis/grpc/MiniRaftClusterWithGRpc.java | 4 +-
.../org/apache/ratis/grpc/TestRaftStream.java | 12 +-
ratis-hadoop/pom.xml | 1 -
.../ratis/hadooprpc/HadoopConfigKeys.java | 37 ++---
.../apache/ratis/hadooprpc/HadoopFactory.java | 2 +-
.../hadooprpc/server/HadoopRpcService.java | 4 +-
.../hadooprpc/MiniRaftClusterWithHadoopRpc.java | 6 +-
.../org/apache/ratis/netty/NettyConfigKeys.java | 18 +--
.../ratis/netty/server/NettyRpcService.java | 2 +-
.../ratis/netty/MiniRaftClusterWithNetty.java | 4 +-
.../MiniRaftClusterWithSimulatedRpc.java | 2 +-
32 files changed, 958 insertions(+), 254 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-client/pom.xml
----------------------------------------------------------------------
diff --git a/ratis-client/pom.xml b/ratis-client/pom.xml
index 4c3dd58..7a070a9 100644
--- a/ratis-client/pom.xml
+++ b/ratis-client/pom.xml
@@ -37,7 +37,6 @@
<dependency>
<artifactId>ratis-common</artifactId>
<groupId>org.apache.ratis</groupId>
- <scope>provided</scope>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index bf85386..8f3c465 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -19,6 +19,7 @@ package org.apache.ratis.client;
import org.apache.ratis.client.impl.ClientImplUtils;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.protocol.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,7 +64,7 @@ public interface RaftClient extends Closeable {
private Collection<RaftPeer> servers;
private RaftPeerId leaderId;
private RaftProperties properties;
- private int retryInterval = RaftClientConfigKeys.RAFT_RPC_TIMEOUT_MS_DEFAULT;
+ private TimeDuration retryInterval = RaftClientConfigKeys.Rpc.TIMEOUT_DEFAULT;
private Builder() {}
@@ -73,9 +74,7 @@ public interface RaftClient extends Closeable {
clientId = ClientId.createId();
}
if (properties != null) {
- retryInterval = properties.getInt(
- RaftClientConfigKeys.RAFT_RPC_TIMEOUT_MS_KEY,
- RaftClientConfigKeys.RAFT_RPC_TIMEOUT_MS_DEFAULT);
+ retryInterval = RaftClientConfigKeys.Rpc.timeout(properties);
}
return ClientImplUtils.newRaftClient(clientId,
Objects.requireNonNull(servers, "The 'servers' field is not initialized."),
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
index e1e1593..9e7bd76 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
@@ -17,7 +17,29 @@
*/
package org.apache.ratis.client;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.util.TimeDuration;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.ratis.conf.ConfUtils.*;
+
public interface RaftClientConfigKeys {
- String RAFT_RPC_TIMEOUT_MS_KEY = "raft.rpc.timeout.ms";
- int RAFT_RPC_TIMEOUT_MS_DEFAULT = 300;
+ String PREFIX = "raft.client";
+
+ interface Rpc {
+ String PREFIX = RaftClientConfigKeys.PREFIX + ".rpc";
+
+ String TIMEOUT_KEY = PREFIX + ".timeout";
+ TimeDuration TIMEOUT_DEFAULT = TimeDuration.valueOf(300, TimeUnit.MILLISECONDS);
+
+ static TimeDuration timeout(RaftProperties properties) {
+ return getTimeDuration(properties.getTimeDuration(TIMEOUT_DEFAULT.getUnit()),
+ TIMEOUT_KEY, TIMEOUT_DEFAULT);
+ }
+ }
+
+ static void main(String[] args) {
+ printAll(RaftClientConfigKeys.class);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
index 85901db..882aa41 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java
@@ -19,6 +19,7 @@ package org.apache.ratis.client.impl;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientRpc;
+import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
@@ -29,7 +30,7 @@ import java.util.Collection;
public class ClientImplUtils {
public static RaftClient newRaftClient(
ClientId clientId, Collection<RaftPeer> peers, RaftPeerId leaderId,
- RaftClientRpc clientRpc, int retryInterval) {
+ RaftClientRpc clientRpc, TimeDuration retryInterval) {
return new RaftClientImpl(clientId, peers, leaderId, clientRpc, retryInterval);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index be22305..4e5db47 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -19,6 +19,7 @@ package org.apache.ratis.client.impl;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientRpc;
+import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.protocol.*;
import org.apache.ratis.util.RaftUtils;
@@ -40,13 +41,13 @@ final class RaftClientImpl implements RaftClient {
private final ClientId clientId;
private final RaftClientRpc clientRpc;
private final Collection<RaftPeer> peers;
- private final int retryInterval;
+ private final TimeDuration retryInterval;
private volatile RaftPeerId leaderId;
RaftClientImpl(ClientId clientId, Collection<RaftPeer> peers,
RaftPeerId leaderId, RaftClientRpc clientRpc,
- int retryInterval) {
+ TimeDuration retryInterval) {
this.clientId = clientId;
this.clientRpc = clientRpc;
this.peers = peers;
@@ -99,7 +100,7 @@ final class RaftClientImpl implements RaftClient {
// sleep and then retry
try {
- Thread.sleep(retryInterval);
+ retryInterval.sleep();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw RaftUtils.toInterruptedIOException(
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java b/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java
index a8bc57d..b67acaf 100644
--- a/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java
+++ b/ratis-common/src/main/java/org/apache/ratis/RaftConfigKeys.java
@@ -17,13 +17,12 @@
*/
package org.apache.ratis;
-import org.apache.ratis.conf.ConfUtils;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.util.RaftUtils;
-import java.util.function.BiConsumer;
+import static org.apache.ratis.conf.ConfUtils.*;
public interface RaftConfigKeys {
String PREFIX = "raft";
@@ -35,7 +34,7 @@ public interface RaftConfigKeys {
String TYPE_DEFAULT = SupportedRpcType.GRPC.name();
static RpcType type(RaftProperties properties) {
- final String t = ConfUtils.get(properties::get, TYPE_KEY, TYPE_DEFAULT);
+ final String t = get(properties::get, TYPE_KEY, TYPE_DEFAULT);
try { // Try parsing it as a SupportedRpcType
return SupportedRpcType.valueOfIgnoreCase(t);
@@ -47,8 +46,12 @@ public interface RaftConfigKeys {
RaftUtils.getClass(t, properties, RpcType.class));
}
- static void setType(BiConsumer<String, String> setRpcType, RpcType type) {
- ConfUtils.set(setRpcType, TYPE_KEY, type.name());
+ static void setType(RaftProperties properties, RpcType type) {
+ set(properties::set, TYPE_KEY, type.name());
}
}
+
+ static void main(String[] args) {
+ printAll(RaftConfigKeys.class);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java b/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
index 34cdbc3..d05c0f8 100644
--- a/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
@@ -17,7 +17,10 @@
*/
package org.apache.ratis.conf;
+import org.apache.ratis.util.CheckedBiConsumer;
import org.apache.ratis.util.NetUtils;
+import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,6 +79,26 @@ public interface ConfUtils {
};
}
+ static BiConsumer<String, TimeDuration> requireNonNegativeTimeDuration() {
+ return (key, value) -> {
+ if (value.isNegative()) {
+ throw new IllegalArgumentException(
+ key + " = " + value + " is negative.");
+ }
+ };
+ }
+
+ static BiFunction<String, Long, Integer> requireInt() {
+ return (key, value) -> {
+ try {
+ return Math.toIntExact(value);
+ } catch (ArithmeticException ae) {
+ throw new IllegalArgumentException(
+ "Failed to cast " + key + " = " + value + " to int.", ae);
+ }
+ };
+ }
+
static boolean getBoolean(
BiFunction<String, Boolean, Boolean> booleanGetter,
String key, boolean defaultValue, BiConsumer<String, Boolean>... assertions) {
@@ -94,6 +117,22 @@ public interface ConfUtils {
return get(longGetter, key, defaultValue, assertions);
}
+ static SizeInBytes getSizeInBytes(
+ BiFunction<String, SizeInBytes, SizeInBytes> getter,
+ String key, SizeInBytes defaultValue, BiConsumer<String, SizeInBytes>... assertions) {
+ final SizeInBytes value = get(getter, key, defaultValue, assertions);
+ requireMin(0L).accept(key, value.getSize());
+ return value;
+ }
+
+ static TimeDuration getTimeDuration(
+ BiFunction<String, TimeDuration, TimeDuration> getter,
+ String key, TimeDuration defaultValue, BiConsumer<String, TimeDuration>... assertions) {
+ final TimeDuration value = get(getter, key, defaultValue, assertions);
+ requireNonNegativeTimeDuration().accept(key, value);
+ return value;
+ }
+
static <T> T get(BiFunction<String, T, T> getter,
String key, T defaultValue, BiConsumer<String, T>... assertions) {
final T value = getter.apply(key, defaultValue);
@@ -126,6 +165,22 @@ public interface ConfUtils {
set(longSetter, key, value, assertions);
}
+ static void setSizeInBytes(
+ BiConsumer<String, String> stringSetter, String key, SizeInBytes value,
+ BiConsumer<String, Long>... assertions) {
+ final long v = value.getSize();
+ Arrays.asList(assertions).forEach(a -> a.accept(key, v));
+ set(stringSetter, key, value.getInput());
+ }
+
+ static void setSizeInBytesInt(
+ BiConsumer<String, String> stringSetter, String key, SizeInBytes value,
+ BiConsumer<String, Integer>... assertions) {
+ final int v = value.getSizeInt();
+ Arrays.asList(assertions).forEach(a -> a.accept(key, v));
+ set(stringSetter, key, value.getInput());
+ }
+
static <T> void set(
BiConsumer<String, T> setter, String key, T value,
BiConsumer<String, T>... assertions) {
@@ -152,42 +207,57 @@ public interface ConfUtils {
out.accept("WARNING: Found non-static field " + f);
return;
}
- final String fieldName = f.getName();
- if (!fieldName.endsWith("_KEY")) {
- if (!fieldName.endsWith("_DEFAULT")) {
- try {
- out.accept("constant: " + fieldName + " = " + f.get(null));
- } catch (IllegalAccessException e) {
- out.accept(fieldName + " is not public");
- }
- }
+ if (printKey(confClass, out, f, "KEY", "DEFAULT",
+ (b, defaultField) ->
+ b.append(defaultField.getType().getSimpleName()).append(", ")
+ .append("default=" + defaultField.get(null)))) {
+ return;
+ }
+ if (printKey(confClass, out, f, "PARAMETER", "CLASS",
+ (b, classField) -> b.append(classField.get(null)))) {
return;
}
+ final String fieldName = f.getName();
+ try {
+ out.accept("constant: " + fieldName + " = " + f.get(null));
+ } catch (IllegalAccessException e) {
+ out.accept("WARNING: Failed to access " + f);
+ }
+ }
+ static boolean printKey(
+ Class<?> confClass, Consumer<Object> out, Field f, String KEY, String DEFAULT,
+ CheckedBiConsumer<StringBuilder, Field, IllegalAccessException> processDefault) {
+ final String fieldName = f.getName();
+ if (fieldName.endsWith("_" + DEFAULT)) {
+ return true;
+ }
+ if (!fieldName.endsWith("_" + KEY)) {
+ return false;
+ }
final StringBuilder b = new StringBuilder();
try {
final Object keyName = f.get(null);
- b.append("key: ").append(keyName);
+ b.append(KEY.toLowerCase()).append(": ").append(keyName);
} catch (IllegalAccessException e) {
- out.accept("WARNING: Failed to access key " + f);
+ out.accept("WARNING: Failed to access " + fieldName);
b.append(fieldName + " is not public");
}
- final String defaultFieldName = fieldName.substring(0, fieldName.length() - 4) + "_DEFAULT";
+ final int len = fieldName.length() - KEY.length();
+ final String defaultFieldName = fieldName.substring(0, len) + DEFAULT;
b.append(" (");
try {
final Field defaultField = confClass.getDeclaredField(defaultFieldName);
- b.append(defaultField.getType().getSimpleName()).append(", ");
-
- final Object defaultValue = defaultField.get(null);
- b.append("default=").append(defaultValue);
+ processDefault.accept(b, defaultField);
} catch (NoSuchFieldException e) {
- out.accept("WARNING: Default value not found for field " + f);
- b.append("default not found");
+ out.accept("WARNING: " + DEFAULT + " not found for field " + f);
+ b.append(DEFAULT).append(" not found");
} catch (IllegalAccessException e) {
- out.accept("WARNING: Failed to access default value " + f);
+ out.accept("WARNING: Failed to access " + defaultFieldName);
b.append(defaultFieldName).append(" is not public");
}
b.append(")");
out.accept(b);
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-common/src/main/java/org/apache/ratis/conf/RaftProperties.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/conf/RaftProperties.java b/ratis-common/src/main/java/org/apache/ratis/conf/RaftProperties.java
index fc93398..ea917d1 100644
--- a/ratis-common/src/main/java/org/apache/ratis/conf/RaftProperties.java
+++ b/ratis-common/src/main/java/org/apache/ratis/conf/RaftProperties.java
@@ -20,7 +20,9 @@ package org.apache.ratis.conf;
import com.google.common.base.Preconditions;
+import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.StringUtils;
+import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.*;
@@ -44,6 +46,7 @@ import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
@@ -639,6 +642,12 @@ public class RaftProperties {
return Long.parseLong(valueString);
}
+ /** @return property value; if it is not set, return the default value. */
+ public SizeInBytes getSizeInBytes(String name, SizeInBytes defaultValue) {
+ final String valueString = getTrimmed(name);
+ return valueString == null? defaultValue: SizeInBytes.valueOf(valueString);
+ }
+
private String getHexDigits(String value) {
boolean negative = false;
String str = value;
@@ -784,65 +793,14 @@ public class RaftProperties {
: Enum.valueOf(defaultValue.getDeclaringClass(), val);
}
- enum ParsedTimeDuration {
- NS {
- TimeUnit unit() { return TimeUnit.NANOSECONDS; }
- String suffix() { return "ns"; }
- },
- US {
- TimeUnit unit() { return TimeUnit.MICROSECONDS; }
- String suffix() { return "us"; }
- },
- MS {
- TimeUnit unit() { return TimeUnit.MILLISECONDS; }
- String suffix() { return "ms"; }
- },
- S {
- TimeUnit unit() { return TimeUnit.SECONDS; }
- String suffix() { return "s"; }
- },
- M {
- TimeUnit unit() { return TimeUnit.MINUTES; }
- String suffix() { return "m"; }
- },
- H {
- TimeUnit unit() { return TimeUnit.HOURS; }
- String suffix() { return "h"; }
- },
- D {
- TimeUnit unit() { return TimeUnit.DAYS; }
- String suffix() { return "d"; }
- };
- abstract TimeUnit unit();
- abstract String suffix();
- static ParsedTimeDuration unitFor(String s) {
- for (ParsedTimeDuration ptd : values()) {
- // iteration order is in decl order, so SECONDS matched last
- if (s.endsWith(ptd.suffix())) {
- return ptd;
- }
- }
- return null;
- }
- static ParsedTimeDuration unitFor(TimeUnit unit) {
- for (ParsedTimeDuration ptd : values()) {
- if (ptd.unit() == unit) {
- return ptd;
- }
- }
- return null;
- }
- }
-
/**
* Set the value of <code>name</code> to the given time duration. This
* is equivalent to <code>set(<name>, value + <time suffix>)</code>.
* @param name Property name
* @param value Time duration
- * @param unit Unit of time
*/
- public void setTimeDuration(String name, long value, TimeUnit unit) {
- set(name, value + ParsedTimeDuration.unitFor(unit).suffix());
+ public void setTimeDuration(String name, TimeDuration value) {
+ set(name, value.toString());
}
/**
@@ -851,37 +809,24 @@ public class RaftProperties {
* (ms), seconds (s), minutes (m), hours (h), and days (d).
* @param name Property name
* @param defaultValue Value returned if no mapping exists.
- * @param unit Unit to convert the stored property, if it exists.
* @throws NumberFormatException If the property stripped of its unit is not
* a number
*/
- public long getTimeDuration(String name, long defaultValue, TimeUnit unit) {
- String vStr = get(name);
- if (null == vStr) {
+ public TimeDuration getTimeDuration(
+ String name, TimeDuration defaultValue, TimeUnit defaultUnit) {
+ final String value = getTrimmed(name);
+ if (null == value) {
return defaultValue;
}
- vStr = vStr.trim();
- return getTimeDurationHelper(name, vStr, unit);
- }
-
- private long getTimeDurationHelper(String name, String vStr, TimeUnit unit) {
- ParsedTimeDuration vUnit = ParsedTimeDuration.unitFor(vStr);
- if (null == vUnit) {
- LOG.warn("No unit for " + name + "(" + vStr + ") assuming " + unit);
- vUnit = ParsedTimeDuration.unitFor(unit);
- } else {
- vStr = vStr.substring(0, vStr.lastIndexOf(vUnit.suffix()));
+ try {
+ return TimeDuration.valueOf(value, defaultUnit);
+ } catch(NumberFormatException e) {
+ throw new IllegalArgumentException("Failed to parse "
+ + name + " = " + value, e);
}
- return unit.convert(Long.parseLong(vStr), vUnit.unit());
}
-
- public long[] getTimeDurations(String name, TimeUnit unit) {
- String[] strings = getTrimmedStrings(name);
- long[] durations = new long[strings.length];
- for (int i = 0; i < strings.length; i++) {
- durations[i] = getTimeDurationHelper(name, strings[i], unit);
- }
- return durations;
+ public BiFunction<String, TimeDuration, TimeDuration> getTimeDuration(TimeUnit defaultUnit) {
+ return (key, defaultValue) -> getTimeDuration(key, defaultValue, defaultUnit);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-common/src/main/java/org/apache/ratis/util/CheckedBiConsumer.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CheckedBiConsumer.java b/ratis-common/src/main/java/org/apache/ratis/util/CheckedBiConsumer.java
new file mode 100644
index 0000000..03256b2
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CheckedBiConsumer.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import java.util.function.BiConsumer;
+
+/** {@link BiConsumer} with a throws-clause. */
+@FunctionalInterface
+public interface CheckedBiConsumer<LEFT, RIGHT, THROWABLE extends Throwable> {
+ /**
+ * The same as {@link BiConsumer#accept(Object, Object)}
+ * except that this method is declared with a throws-clause.
+ */
+ void accept(LEFT left, RIGHT right) throws THROWABLE;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java b/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java
new file mode 100644
index 0000000..d5306c5
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import java.util.Objects;
+
+/**
+ * Size which may be constructed with a {@link TraditionalBinaryPrefix}.
+ */
+public class SizeInBytes {
+ public static SizeInBytes valueOf(long size) {
+ final String s = String.valueOf(size);
+ return new SizeInBytes(size, s, s);
+ }
+
+ public static SizeInBytes valueOf(long n, TraditionalBinaryPrefix prefix) {
+ final long size = Objects.requireNonNull(prefix, "prefix = null").toLong(n);
+ final String input = n + " " + prefix.getSymbol();
+ final String description = input + " (=" + size + ")";
+ return new SizeInBytes(size, input, description);
+ }
+
+ public static SizeInBytes valueOf(String input) {
+ input = Objects.requireNonNull(input, "input = null").trim();
+
+ final int last = input.length() - 1;
+ final String s = "b".equalsIgnoreCase(input.substring(last))?
+ input.substring(0, last): input;
+ final long size;
+ try {
+ size = TraditionalBinaryPrefix.string2long(s);
+ } catch(NumberFormatException e) {
+ throw new IllegalArgumentException("Failed to parse input " + input, e);
+ }
+ final String description = input.equals(String.valueOf(size))?
+ input: input + " (=" + size + ")";
+
+ return new SizeInBytes(size, input, description);
+ }
+
+ private final long size;
+ private final String input;
+ private final String description;
+
+ private SizeInBytes(long size, String input, String description) {
+ this.size = size;
+ this.input = input;
+ this.description = description;
+ }
+
+ public long getSize() {
+ return size;
+ }
+
+ public int getSizeInt() {
+ return Math.toIntExact(getSize());
+ }
+
+ public String getInput() {
+ return input;
+ }
+
+ @Override
+ public String toString() {
+ return description;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
new file mode 100644
index 0000000..d57c115
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Time duration is represented together with a {@link TimeUnit}.
+ */
+public class TimeDuration implements Comparable<TimeDuration> {
+
+ public enum Abbreviation {
+ NANOSECONDS("ns", "nanos"),
+ MICROSECONDS("us", "\u03bcs", "micros"),
+ MILLISECONDS("ms", "msec", "millis"),
+ SECONDS("s", "sec"),
+ MINUTES("m", "min"),
+ HOURS("h", "hr"),
+ DAYS("d");
+
+ private final TimeUnit unit = TimeUnit.valueOf(name());
+ private final List<String> symbols;
+
+ Abbreviation(String... symbols) {
+ final List<String> input = Arrays.asList(symbols);
+ final List<String> all = new ArrayList<>(input.size() + 2);
+ input.forEach(s -> all.add(s.toLowerCase()));
+
+ final String s = unit.name().toLowerCase();
+ all.add(s);
+ all.add(s.substring(0, s.length() - 1));
+
+ this.symbols = Collections.unmodifiableList(all);
+ }
+
+ public TimeUnit unit() {
+ return unit;
+ }
+
+ String getDefault() {
+ return symbols.get(0);
+ }
+
+ public List<String> getSymbols() {
+ return symbols;
+ }
+
+ public static Abbreviation valueOf(TimeUnit unit) {
+ return valueOf(unit.name());
+ }
+ }
+
+ public static long parse(String timeString, TimeUnit targetUnit) {
+ return valueOf(timeString, targetUnit).toLong(targetUnit);
+ }
+
+ /**
+ * Parse the given time duration string.
+ * If there is no unit specified, use the default unit.
+ */
+ public static TimeDuration valueOf(String timeString, TimeUnit defaultUnit) {
+ final String lower = Objects.requireNonNull(timeString, "timeString = null").trim();
+ for(Abbreviation a : Abbreviation.values()) {
+ for(String s : a.getSymbols()) {
+ if (lower.endsWith(s)) {
+ final String value = lower.substring(0, lower.length()-s.length()).trim();
+ try {
+ return valueOf(Long.parseLong(value), a.unit());
+ } catch(NumberFormatException e) {
+ // failed with current symbol; ignore and try next symbol.
+ }
+ }
+ }
+ }
+ return valueOf(Long.parseLong(lower), defaultUnit);
+ }
+
+ public static TimeDuration valueOf(long duration, TimeUnit unit) {
+ return new TimeDuration(duration, unit);
+ }
+
+ private final long duration;
+ private final TimeUnit unit;
+
+ private TimeDuration(long duration, TimeUnit unit) {
+ this.duration = duration;
+ this.unit = Objects.requireNonNull(unit, "unit = null");
+ }
+
+ public TimeUnit getUnit() {
+ return unit;
+ }
+
+ public long toLong(TimeUnit targetUnit) {
+ return targetUnit.convert(duration, unit);
+ }
+
+ public boolean isNegative() {
+ return duration < 0;
+ }
+
+ public void sleep() throws InterruptedException {
+ unit.sleep(duration);
+ }
+
+ @Override
+ public int compareTo(TimeDuration that) {
+ if (this.unit.compareTo(that.unit) > 0) {
+ return that.compareTo(this);
+ }
+ // this.unit <= that.unit
+ final long thisDurationInThatUnit = that.unit.convert(this.duration, this.unit);
+ if (thisDurationInThatUnit == that.duration) {
+ final long thatDurationInThisUnit = this.unit.convert(that.duration, that.unit);
+ return Long.compare(this.duration, thatDurationInThisUnit);
+ } else {
+ return Long.compare(thisDurationInThatUnit, that.duration);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return duration + " " + Abbreviation.valueOf(unit).getDefault();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-common/src/main/java/org/apache/ratis/util/TraditionalBinaryPrefix.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TraditionalBinaryPrefix.java b/ratis-common/src/main/java/org/apache/ratis/util/TraditionalBinaryPrefix.java
new file mode 100644
index 0000000..d9677c8
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TraditionalBinaryPrefix.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+/**
+ * The traditional binary prefixes, kilo, mega, ..., exa,
+ * which can be represented by a 64-bit integer.
+ * {@link TraditionalBinaryPrefix} symbols are case insensitive.
+ */
+public enum TraditionalBinaryPrefix {
+ KILO(10),
+ MEGA(KILO.bitShift + 10),
+ GIGA(MEGA.bitShift + 10),
+ TERA(GIGA.bitShift + 10),
+ PETA(TERA.bitShift + 10),
+ EXA (PETA.bitShift + 10);
+
+ private final long value;
+ private final char symbol;
+ private final int bitShift;
+ private final long bitMask;
+
+ TraditionalBinaryPrefix(int bitShift) {
+ this.bitShift = bitShift;
+ this.value = 1L << bitShift;
+ this.bitMask = this.value - 1L;
+ this.symbol = toString().charAt(0);
+ }
+
+ public long getValue() {
+ return value;
+ }
+
+ public char getSymbol() {
+ return symbol;
+ }
+
+ public long toLong(long n) {
+ final long shifted = n << bitShift;
+ if (n != shifted >>> bitShift) {
+ throw new ArithmeticException("Long overflow: " + toString(n)
+ + " cannot be assigned to a long.");
+ }
+ return shifted;
+ }
+
+ public String toString(long n) {
+ return n + String.valueOf(symbol);
+ }
+
+ /**
+ * @return The object corresponding to the symbol.
+ */
+ public static TraditionalBinaryPrefix valueOf(char symbol) {
+ symbol = Character.toUpperCase(symbol);
+ for(TraditionalBinaryPrefix prefix : TraditionalBinaryPrefix.values()) {
+ if (symbol == prefix.symbol) {
+ return prefix;
+ }
+ }
+ throw new IllegalArgumentException("Unknown symbol '" + symbol + "'");
+ }
+
+ /**
+ * Convert a string to long.
+ * The input string is first be trimmed
+ * and then it is parsed with traditional binary prefix.
+ *
+ * For example,
+ * "-1230k" will be converted to -1230 * 1024 = -1259520;
+ * "891g" will be converted to 891 * 1024^3 = 956703965184;
+ *
+ * @param s input string
+ * @return a long value represented by the input string.
+ */
+ public static long string2long(String s) {
+ s = s.trim();
+ final int lastpos = s.length() - 1;
+ final char lastchar = s.charAt(lastpos);
+ if (Character.isDigit(lastchar))
+ return Long.parseLong(s);
+ else {
+ long p;
+ try {
+ p = TraditionalBinaryPrefix.valueOf(lastchar).value;
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Invalid size prefix '" + lastchar
+ + "' in '" + s + "'. Allowed prefixes are k, m, g, t, p, e (case insensitive)");
+ }
+ long num = Long.parseLong(s.substring(0, lastpos).trim());
+ if (num > Long.MAX_VALUE/p || num < Long.MIN_VALUE/p) {
+ throw new IllegalArgumentException(s + " does not fit in a Long");
+ }
+ return num * p;
+ }
+ }
+
+ /**
+ * Convert a long integer to a string with traditional binary prefix.
+ *
+ * @param n the value to be converted
+ * @param unit The unit, e.g. "B" for bytes.
+ * @param decimalPlaces The number of decimal places.
+ * @return a string with traditional binary prefix.
+ */
+ public static String long2String(long n, String unit, int decimalPlaces) {
+ if (unit == null) {
+ unit = "";
+ }
+ //take care a special case
+ if (n == Long.MIN_VALUE) {
+ return "-8 " + EXA.symbol + unit;
+ }
+
+ final StringBuilder b = new StringBuilder();
+ //take care negative numbers
+ if (n < 0) {
+ b.append('-');
+ n = -n;
+ }
+ if (n < KILO.value) {
+ //no prefix
+ b.append(n);
+ return (unit.isEmpty()? b: b.append(" ").append(unit)).toString();
+ } else {
+ //find traditional binary prefix
+ int i = 0;
+ for(; i < values().length && n >= values()[i].value; i++);
+ TraditionalBinaryPrefix prefix = values()[i - 1];
+
+ if ((n & prefix.bitMask) == 0) {
+ //exact division
+ b.append(n >> prefix.bitShift);
+ } else {
+ final String format = "%." + decimalPlaces + "f";
+ String s = StringUtils.format(format, n/(double)prefix.value);
+ //check a special rounding up case
+ if (s.startsWith("1024")) {
+ prefix = values()[i];
+ s = StringUtils.format(format, n/(double)prefix.value);
+ }
+ b.append(s);
+ }
+ return b.append(' ').append(prefix.symbol).append(unit).toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-common/src/test/java/org/apache/ratis/util/TestTimeDuration.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/test/java/org/apache/ratis/util/TestTimeDuration.java b/ratis-common/src/test/java/org/apache/ratis/util/TestTimeDuration.java
new file mode 100644
index 0000000..a724fb3
--- /dev/null
+++ b/ratis-common/src/test/java/org/apache/ratis/util/TestTimeDuration.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.ratis.util.TimeDuration.Abbreviation;
+import static org.apache.ratis.util.TimeDuration.parse;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class TestTimeDuration {
+ @Test(timeout = 10000)
+ public void testTimeDuration() throws Exception {
+ Arrays.asList(TimeUnit.values())
+ .forEach(a -> assertNotNull(Abbreviation.valueOf(a.name())));
+ assertEquals(TimeUnit.values().length, Abbreviation.values().length);
+
+ final List<String> allSymbols = Arrays.asList(Abbreviation.values()).stream()
+ .map(Abbreviation::getSymbols)
+ .flatMap(List::stream)
+ .collect(Collectors.toList());
+ Arrays.asList(TimeUnit.values()).forEach(unit ->
+ allSymbols.stream()
+ .map(s -> "0" + s)
+ .forEach(s -> assertEquals(s, 0L, parse(s, unit))));
+
+ assertEquals(1L, parse("1000000 ns", TimeUnit.MILLISECONDS));
+ assertEquals(10L, parse("10000000 nanos", TimeUnit.MILLISECONDS));
+ assertEquals(100L, parse("100000000 nanosecond", TimeUnit.MILLISECONDS));
+ assertEquals(1000L, parse("1000000000 nanoseconds", TimeUnit.MILLISECONDS));
+
+ assertEquals(1L, parse("1000 us", TimeUnit.MILLISECONDS));
+ assertEquals(10L, parse("10000 \u03bcs", TimeUnit.MILLISECONDS));
+ assertEquals(100L, parse("100000 micros", TimeUnit.MILLISECONDS));
+ assertEquals(1000L, parse("1000000 microsecond", TimeUnit.MILLISECONDS));
+ assertEquals(10000L, parse("10000000 microseconds", TimeUnit.MILLISECONDS));
+
+ assertEquals(1L, parse("1 ms", TimeUnit.MILLISECONDS));
+ assertEquals(10L, parse("10 msec", TimeUnit.MILLISECONDS));
+ assertEquals(100L, parse("100 millis", TimeUnit.MILLISECONDS));
+ assertEquals(1000L, parse("1000 millisecond", TimeUnit.MILLISECONDS));
+ assertEquals(10000L, parse("10000 milliseconds", TimeUnit.MILLISECONDS));
+
+ assertEquals(1000L, parse("1 s", TimeUnit.MILLISECONDS));
+ assertEquals(10000L, parse("10 sec", TimeUnit.MILLISECONDS));
+ assertEquals(100000L, parse("100 second", TimeUnit.MILLISECONDS));
+ assertEquals(1000000L, parse("1000 seconds", TimeUnit.MILLISECONDS));
+
+ assertEquals(60, parse("1 m", TimeUnit.SECONDS));
+ assertEquals(600, parse("10 min", TimeUnit.SECONDS));
+ assertEquals(6000, parse("100 minutes", TimeUnit.SECONDS));
+ assertEquals(60000, parse("1000 minutes", TimeUnit.SECONDS));
+
+ assertEquals(60, parse("1 h", TimeUnit.MINUTES));
+ assertEquals(600, parse("10 hr", TimeUnit.MINUTES));
+ assertEquals(6000, parse("100 hour", TimeUnit.MINUTES));
+ assertEquals(60000, parse("1000 hours", TimeUnit.MINUTES));
+
+ assertEquals(24, parse("1 d", TimeUnit.HOURS));
+ assertEquals(240, parse("10 day", TimeUnit.HOURS));
+ assertEquals(2400, parse("100 days", TimeUnit.HOURS));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-common/src/test/java/org/apache/ratis/util/TestTraditionalBinaryPrefix.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/test/java/org/apache/ratis/util/TestTraditionalBinaryPrefix.java b/ratis-common/src/test/java/org/apache/ratis/util/TestTraditionalBinaryPrefix.java
new file mode 100644
index 0000000..ed9aadb
--- /dev/null
+++ b/ratis-common/src/test/java/org/apache/ratis/util/TestTraditionalBinaryPrefix.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.junit.Test;
+
+import static org.apache.ratis.util.TraditionalBinaryPrefix.long2String;
+import static org.apache.ratis.util.TraditionalBinaryPrefix.string2long;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class TestTraditionalBinaryPrefix {
+ @Test(timeout = 10000)
+ public void testTraditionalBinaryPrefix() throws Exception {
+ //test string2long(..)
+ String[] symbol = {"k", "m", "g", "t", "p", "e"};
+ long m = 1024;
+ for(String s : symbol) {
+ assertEquals(0, string2long(0 + s));
+ assertEquals(m, string2long(1 + s));
+ m *= 1024;
+ }
+
+ assertEquals(0L, string2long("0"));
+ assertEquals(1024L, string2long("1k"));
+ assertEquals(-1024L, string2long("-1k"));
+ assertEquals(1259520L, string2long("1230K"));
+ assertEquals(-1259520L, string2long("-1230K"));
+ assertEquals(104857600L, string2long("100m"));
+ assertEquals(-104857600L, string2long("-100M"));
+ assertEquals(956703965184L, string2long("891g"));
+ assertEquals(-956703965184L, string2long("-891G"));
+ assertEquals(501377302265856L, string2long("456t"));
+ assertEquals(-501377302265856L, string2long("-456T"));
+ assertEquals(11258999068426240L, string2long("10p"));
+ assertEquals(-11258999068426240L, string2long("-10P"));
+ assertEquals(1152921504606846976L, string2long("1e"));
+ assertEquals(-1152921504606846976L, string2long("-1E"));
+
+ String tooLargeNumStr = "10e";
+ try {
+ string2long(tooLargeNumStr);
+ fail("Test passed for a number " + tooLargeNumStr + " too large");
+ } catch (IllegalArgumentException e) {
+ assertEquals(tooLargeNumStr + " does not fit in a Long", e.getMessage());
+ }
+
+ String tooSmallNumStr = "-10e";
+ try {
+ string2long(tooSmallNumStr);
+ fail("Test passed for a number " + tooSmallNumStr + " too small");
+ } catch (IllegalArgumentException e) {
+ assertEquals(tooSmallNumStr + " does not fit in a Long", e.getMessage());
+ }
+
+ String invalidFormatNumStr = "10kb";
+ char invalidPrefix = 'b';
+ try {
+ string2long(invalidFormatNumStr);
+ fail("Test passed for a number " + invalidFormatNumStr
+ + " has invalid format");
+ } catch (IllegalArgumentException e) {
+ assertEquals("Invalid size prefix '" + invalidPrefix + "' in '"
+ + invalidFormatNumStr
+ + "'. Allowed prefixes are k, m, g, t, p, e(case insensitive)",
+ e.getMessage());
+ }
+
+ //test long2string(..)
+ assertEquals("0", long2String(0, null, 2));
+ for(int decimalPlace = 0; decimalPlace < 2; decimalPlace++) {
+ for(int n = 1; n < TraditionalBinaryPrefix.KILO.getValue(); n++) {
+ assertEquals(n + "", long2String(n, null, decimalPlace));
+ assertEquals(-n + "", long2String(-n, null, decimalPlace));
+ }
+ assertEquals("1 K", long2String(1L << 10, null, decimalPlace));
+ assertEquals("-1 K", long2String(-1L << 10, null, decimalPlace));
+ }
+
+ assertEquals("8.00 E", long2String(Long.MAX_VALUE, null, 2));
+ assertEquals("8.00 E", long2String(Long.MAX_VALUE - 1, null, 2));
+ assertEquals("-8 E", long2String(Long.MIN_VALUE, null, 2));
+ assertEquals("-8.00 E", long2String(Long.MIN_VALUE + 1, null, 2));
+
+ final String[] zeros = {" ", ".0 ", ".00 "};
+ for(int decimalPlace = 0; decimalPlace < zeros.length; decimalPlace++) {
+ final String trailingZeros = zeros[decimalPlace];
+
+ for(int e = 11; e < Long.SIZE - 1; e++) {
+ final TraditionalBinaryPrefix p
+ = TraditionalBinaryPrefix.values()[e/10 - 1];
+
+ { // n = 2^e
+ final long n = 1L << e;
+ final String expected = (n/p.getValue()) + " " + p.getSymbol();
+ assertEquals("n=" + n, expected, long2String(n, null, 2));
+ }
+
+ { // n = 2^e + 1
+ final long n = (1L << e) + 1;
+ final String expected = (n/p.getValue()) + trailingZeros + p.getSymbol();
+ assertEquals("n=" + n, expected, long2String(n, null, decimalPlace));
+ }
+
+ { // n = 2^e - 1
+ final long n = (1L << e) - 1;
+ final String expected = ((n+1)/p.getValue()) + trailingZeros + p.getSymbol();
+ assertEquals("n=" + n, expected, long2String(n, null, decimalPlace));
+ }
+ }
+ }
+
+ assertEquals("1.50 K", long2String(3L << 9, null, 2));
+ assertEquals("1.5 K", long2String(3L << 9, null, 1));
+ assertEquals("1.50 M", long2String(3L << 19, null, 2));
+ assertEquals("2 M", long2String(3L << 19, null, 0));
+ assertEquals("3 G", long2String(3L << 30, null, 2));
+
+ assertEquals("0 B", byteDescription(0));
+ assertEquals("-100 B", byteDescription(-100));
+ assertEquals("1 KB", byteDescription(1024));
+ assertEquals("1.50 KB", byteDescription(3L << 9));
+ assertEquals("1.50 MB", byteDescription(3L << 19));
+ assertEquals("3 GB", byteDescription(3L << 30));
+ }
+
+ private static String byteDescription(long len) {
+ return long2String(len, "B", 2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-grpc/pom.xml
----------------------------------------------------------------------
diff --git a/ratis-grpc/pom.xml b/ratis-grpc/pom.xml
index 6ffad7c..99af552 100644
--- a/ratis-grpc/pom.xml
+++ b/ratis-grpc/pom.xml
@@ -37,7 +37,6 @@
<dependency>
<artifactId>ratis-common</artifactId>
<groupId>org.apache.ratis</groupId>
- <scope>provided</scope>
</dependency>
<dependency>
<artifactId>ratis-common</artifactId>
@@ -49,7 +48,6 @@
<dependency>
<artifactId>ratis-client</artifactId>
<groupId>org.apache.ratis</groupId>
- <scope>provided</scope>
</dependency>
<dependency>
<artifactId>ratis-client</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
new file mode 100644
index 0000000..430fd17
--- /dev/null
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.client.RaftClientConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.TimeDuration;
+
+import static org.apache.ratis.conf.ConfUtils.*;
+
+public interface GrpcConfigKeys {
+ String PREFIX = "raft.grpc";
+
+ interface Server {
+ String PREFIX = GrpcConfigKeys.PREFIX + ".server";
+
+ String PORT_KEY = PREFIX + ".port";
+ int PORT_DEFAULT = 0;
+ static int port(RaftProperties properties) {
+ return getInt(properties::getInt,
+ PORT_KEY, PORT_DEFAULT, requireMin(0), requireMax(65536));
+ }
+ static void setPort(RaftProperties properties, int port) {
+ setInt(properties::setInt, PORT_KEY, port);
+ }
+
+ String MESSAGE_SIZE_MAX_KEY = PREFIX + ".message.size.max";
+ SizeInBytes MESSAGE_SIZE_MAX_DEFAULT = SizeInBytes.valueOf("64MB");
+ static SizeInBytes messageSizeMax(RaftProperties properties) {
+ return getSizeInBytes(properties::getSizeInBytes,
+ MESSAGE_SIZE_MAX_KEY, MESSAGE_SIZE_MAX_DEFAULT);
+ }
+
+ String LEADER_OUTSTANDING_APPENDS_MAX_KEY = PREFIX + ".leader.outstanding.appends.max";
+ int LEADER_OUTSTANDING_APPENDS_MAX_DEFAULT = 128;
+ static int leaderOutstandingAppendsMax(RaftProperties properties) {
+ return getInt(properties::getInt,
+ LEADER_OUTSTANDING_APPENDS_MAX_KEY, LEADER_OUTSTANDING_APPENDS_MAX_DEFAULT, requireMin(0));
+ }
+ }
+
+ interface OutputStream {
+ String PREFIX = GrpcConfigKeys.PREFIX + ".outputstream";
+
+ String BUFFER_SIZE_KEY = PREFIX + ".buffer.size";
+ SizeInBytes BUFFER_SIZE_DEFAULT = SizeInBytes.valueOf("64KB");
+ static SizeInBytes bufferSize(RaftProperties properties) {
+ return getSizeInBytes(properties::getSizeInBytes,
+ BUFFER_SIZE_KEY, BUFFER_SIZE_DEFAULT);
+ }
+ static void setBufferSize(RaftProperties properties, SizeInBytes bufferSize) {
+ setSizeInBytesInt(properties::set, BUFFER_SIZE_KEY, bufferSize);
+ }
+
+ String RETRY_TIMES_KEY = PREFIX + ".retry.times";
+ int RETRY_TIMES_DEFAULT = 5;
+ static int retryTimes(RaftProperties properties) {
+ return getInt(properties::getInt,
+ RETRY_TIMES_KEY, RETRY_TIMES_DEFAULT, requireMin(1));
+ }
+
+ String RETRY_INTERVAL_KEY = PREFIX + ".retry.interval";
+ TimeDuration RETRY_INTERVAL_DEFAULT = RaftClientConfigKeys.Rpc.TIMEOUT_DEFAULT;
+ static TimeDuration retryInterval(RaftProperties properties) {
+ return getTimeDuration(properties.getTimeDuration(RETRY_INTERVAL_DEFAULT.getUnit()),
+ RETRY_INTERVAL_KEY, RETRY_INTERVAL_DEFAULT);
+ }
+
+ String OUTSTANDING_APPENDS_MAX_KEY = PREFIX + ".outstanding.appends.max";
+ int OUTSTANDING_APPENDS_MAX_DEFAULT = 128;
+ static int outstandingAppendsMax(RaftProperties properties) {
+ return getInt(properties::getInt,
+ OUTSTANDING_APPENDS_MAX_KEY, OUTSTANDING_APPENDS_MAX_DEFAULT, requireMin(0));
+ }
+ }
+
+ static void main(String[] args) {
+ printAll(GrpcConfigKeys.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
index 875efbb..3185a37 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
@@ -41,8 +41,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import static org.apache.ratis.grpc.RaftGrpcConfigKeys.*;
-
/** A grpc implementation of {@link RaftServerRpc}. */
public class RaftGRpcService implements RaftServerRpc {
static final Logger LOG = LoggerFactory.getLogger(RaftGRpcService.class);
@@ -75,10 +73,8 @@ public class RaftGRpcService implements RaftServerRpc {
private RaftGRpcService(RaftServer server) {
this(server,
- server.getProperties().getInt(RAFT_GRPC_SERVER_PORT_KEY,
- RAFT_GRPC_SERVER_PORT_DEFAULT),
- server.getProperties().getInt(RAFT_GRPC_MESSAGE_MAXSIZE_KEY,
- RAFT_GRPC_MESSAGE_MAXSIZE_DEFAULT));
+ GrpcConfigKeys.Server.port(server.getProperties()),
+ GrpcConfigKeys.Server.messageSizeMax(server.getProperties()).getSizeInt());
}
private RaftGRpcService(RaftServer raftServer, int port, int maxMessageSize) {
ServerBuilder serverBuilder = ServerBuilder.forPort(port);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcConfigKeys.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcConfigKeys.java
deleted file mode 100644
index ffec8ff..0000000
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGrpcConfigKeys.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ratis.grpc;
-
-import org.apache.ratis.client.RaftClientConfigKeys;
-
-public interface RaftGrpcConfigKeys {
- String PREFIX = "raft.grpc";
-
- String RAFT_GRPC_SERVER_PORT_KEY = PREFIX + ".server.port";
- int RAFT_GRPC_SERVER_PORT_DEFAULT = 0;
-
- String RAFT_GRPC_MESSAGE_MAXSIZE_KEY = PREFIX + ".message.maxsize";
- int RAFT_GRPC_MESSAGE_MAXSIZE_DEFAULT = 64 * 1024 * 1024; // 64 MB
-
- String RAFT_GRPC_LEADER_MAX_OUTSTANDING_APPENDS_KEY =
- PREFIX + "leader.max.outstanding.appends";
- int RAFT_GRPC_LEADER_MAX_OUTSTANDING_APPENDS_DEFAULT = 128;
-
- String RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_KEY =
- PREFIX + "client.max.outstanding.appends";
- int RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_DEFAULT = 128;
-
- String RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY = "raft.outputstream.buffer.size";
- int RAFT_OUTPUTSTREAM_BUFFER_SIZE_DEFAULT = 64 * 1024;
-
- String RAFT_OUTPUTSTREAM_MAX_RETRY_TIMES_KEY = "raft.outputstream.max.retry.times";
- int RAFT_OUTPUTSTREAM_MAX_RETRY_TIMES_DEFAULT = 5;
-
- String RAFT_OUTPUTSTREAM_RETRY_INTERVAL_KEY = "raft.outputstream.retry.interval";
- long RAFT_OUTPUTSTREAM_RETRY_INTERVAL_DEFAULT = RaftClientConfigKeys.RAFT_RPC_TIMEOUT_MS_DEFAULT;
-}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
index 6d7b207..553ec9a 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
@@ -18,19 +18,15 @@
package org.apache.ratis.grpc.client;
import com.google.common.base.Preconditions;
-
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.RaftGrpcUtil;
+import org.apache.ratis.protocol.*;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.shaded.proto.RaftProtos.RaftRpcRequestProto;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.grpc.RaftGrpcConfigKeys;
-import org.apache.ratis.grpc.RaftGrpcUtil;
-import org.apache.ratis.protocol.NotLeaderException;
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.util.Daemon;
import org.apache.ratis.util.PeerProxyMap;
import org.apache.ratis.util.RaftUtils;
@@ -41,14 +37,11 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.ratis.client.impl.ClientProtoUtils.*;
-import static org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_DEFAULT;
-import static org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_KEY;
public class AppendStreamer implements Closeable {
public static final Logger LOG = LoggerFactory.getLogger(AppendStreamer.class);
@@ -59,16 +52,11 @@ public class AppendStreamer implements Closeable {
private final Map<RaftPeerId, IOException> exceptionMap = new HashMap<>();
private final AtomicInteger retryTimes = new AtomicInteger(0);
private final int maxRetryTimes;
- private final long retryInterval;
+ private final TimeDuration retryInterval;
ExceptionAndRetry(RaftProperties prop) {
- maxRetryTimes = prop.getInt(
- RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_MAX_RETRY_TIMES_KEY,
- RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_MAX_RETRY_TIMES_DEFAULT);
- retryInterval = prop.getTimeDuration(
- RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_RETRY_INTERVAL_KEY,
- RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_RETRY_INTERVAL_DEFAULT,
- TimeUnit.MILLISECONDS);
+ maxRetryTimes = GrpcConfigKeys.OutputStream.retryTimes(prop);
+ retryInterval = GrpcConfigKeys.OutputStream.retryInterval(prop);
}
void addException(RaftPeerId peer, IOException e) {
@@ -102,9 +90,7 @@ public class AppendStreamer implements Closeable {
AppendStreamer(RaftProperties prop, Collection<RaftPeer> peers,
RaftPeerId leaderId, ClientId clientId) {
this.clientId = clientId;
- maxPendingNum = prop.getInt(
- RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_KEY,
- RAFT_GRPC_CLIENT_MAX_OUTSTANDING_APPENDS_DEFAULT);
+ maxPendingNum = GrpcConfigKeys.OutputStream.outstandingAppendsMax(prop);
dataQueue = new ConcurrentLinkedDeque<>();
ackQueue = new ConcurrentLinkedDeque<>();
exceptionAndRetry = new ExceptionAndRetry(prop);
@@ -359,7 +345,7 @@ public class AppendStreamer implements Closeable {
dataQueue.poll());
ackQueue.offer(request);
try {
- Thread.sleep(exceptionAndRetry.retryInterval);
+ exceptionAndRetry.retryInterval.sleep();
} catch (InterruptedException ignored) {
}
leaderProxy.onNext(request);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java
index 73e56b8..33d3d22 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java
@@ -17,20 +17,18 @@
*/
package org.apache.ratis.grpc.client;
-import static org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_BUFFER_SIZE_DEFAULT;
-import static org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Collection;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.util.ProtoUtils;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+
public class RaftOutputStream extends OutputStream {
/** internal buffer */
private final byte buf[];
@@ -43,8 +41,7 @@ public class RaftOutputStream extends OutputStream {
public RaftOutputStream(RaftProperties prop, ClientId clientId,
Collection<RaftPeer> peers, RaftPeerId leaderId) {
- final int bufferSize = prop.getInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY,
- RAFT_OUTPUTSTREAM_BUFFER_SIZE_DEFAULT);
+ final int bufferSize = GrpcConfigKeys.OutputStream.bufferSize(prop).getSizeInt();
buf = new byte[bufferSize];
count = 0;
this.clientId = clientId;
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
index 5f1d901..91dc02c 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
@@ -26,7 +26,7 @@ import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotReplyProto;
import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.grpc.RaftGRpcService;
-import org.apache.ratis.grpc.RaftGrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.server.impl.FollowerInfo;
import org.apache.ratis.server.impl.LeaderState;
import org.apache.ratis.server.impl.LogAppender;
@@ -63,9 +63,7 @@ public class GRpcLogAppender extends LogAppender {
RaftGRpcService rpcService = (RaftGRpcService) server.getServerRpc();
client = rpcService.getRpcClient(f.getPeer());
- maxPendingRequestsNum = server.getProperties().getInt(
- RaftGrpcConfigKeys.RAFT_GRPC_LEADER_MAX_OUTSTANDING_APPENDS_KEY,
- RaftGrpcConfigKeys.RAFT_GRPC_LEADER_MAX_OUTSTANDING_APPENDS_DEFAULT);
+ maxPendingRequestsNum = GrpcConfigKeys.Server.leaderOutstandingAppendsMax(server.getProperties());
pendingRequests = new ConcurrentLinkedQueue<>();
appendResponseHandler = new AppendLogResponseHandler();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
index 9676a48..c3ca707 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGRpc.java
@@ -34,7 +34,7 @@ public class MiniRaftClusterWithGRpc extends MiniRaftCluster.RpcBase {
@Override
public MiniRaftClusterWithGRpc newCluster(
String[] ids, RaftProperties prop) {
- RaftConfigKeys.Rpc.setType(prop::set, SupportedRpcType.GRPC);
+ RaftConfigKeys.Rpc.setType(prop, SupportedRpcType.GRPC);
return new MiniRaftClusterWithGRpc(ids, prop);
}
};
@@ -50,7 +50,7 @@ public class MiniRaftClusterWithGRpc extends MiniRaftCluster.RpcBase {
protected RaftServerImpl newRaftServer(
RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf,
RaftProperties properties) throws IOException {
- properties.setInt(RaftGrpcConfigKeys.RAFT_GRPC_SERVER_PORT_KEY, getPort(id, conf));
+ GrpcConfigKeys.Server.setPort(properties, getPort(id, conf));
return ServerImplUtils.newRaftServer(id, stateMachine, conf, properties, null);
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
index d7cb75a..4bae5a9 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftStream.java
@@ -18,11 +18,12 @@
package org.apache.ratis.grpc;
import org.apache.log4j.Level;
-import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.grpc.client.AppendStreamer;
import org.apache.ratis.grpc.client.RaftOutputStream;
+import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.storage.RaftLog;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
@@ -42,7 +43,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import static org.apache.ratis.RaftTestUtil.waitForLeader;
-import static org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY;
import static org.junit.Assert.fail;
public class TestRaftStream {
@@ -81,7 +81,7 @@ public class TestRaftStream {
LOG.info("Running testSimpleWrite");
// default 64K is too large for a test
- prop.setInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY, 4);
+ GrpcConfigKeys.OutputStream.setBufferSize(prop, SizeInBytes.valueOf(4));
cluster = MiniRaftClusterWithGRpc.FACTORY.newCluster(NUM_SERVERS, prop);
cluster.start();
@@ -122,7 +122,7 @@ public class TestRaftStream {
public void testWriteAndFlush() throws Exception {
LOG.info("Running testWriteAndFlush");
- prop.setInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY, ByteValue.BUFFERSIZE);
+ GrpcConfigKeys.OutputStream.setBufferSize(prop, SizeInBytes.valueOf(ByteValue.BUFFERSIZE));
cluster = MiniRaftClusterWithGRpc.FACTORY.newCluster(NUM_SERVERS, prop);
cluster.start();
@@ -200,7 +200,7 @@ public class TestRaftStream {
@Test
public void testWriteWithOffset() throws Exception {
LOG.info("Running testWriteWithOffset");
- prop.setInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY, ByteValue.BUFFERSIZE);
+ GrpcConfigKeys.OutputStream.setBufferSize(prop, SizeInBytes.valueOf(ByteValue.BUFFERSIZE));
cluster = MiniRaftClusterWithGRpc.FACTORY.newCluster(NUM_SERVERS, prop);
cluster.start();
@@ -259,7 +259,7 @@ public class TestRaftStream {
public void testKillLeader() throws Exception {
LOG.info("Running testChangeLeader");
- prop.setInt(RAFT_OUTPUTSTREAM_BUFFER_SIZE_KEY, 4);
+ GrpcConfigKeys.OutputStream.setBufferSize(prop, SizeInBytes.valueOf(4));
cluster = MiniRaftClusterWithGRpc.FACTORY.newCluster(NUM_SERVERS, prop);
cluster.start();
final RaftServerImpl leader = waitForLeader(cluster);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/ratis-hadoop/pom.xml b/ratis-hadoop/pom.xml
index f3be6a6..98ea84f 100644
--- a/ratis-hadoop/pom.xml
+++ b/ratis-hadoop/pom.xml
@@ -41,7 +41,6 @@
<dependency>
<artifactId>ratis-common</artifactId>
<groupId>org.apache.ratis</groupId>
- <scope>provided</scope>
</dependency>
<dependency>
<artifactId>ratis-common</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java
index f65ee5d..5d66480 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopConfigKeys.java
@@ -18,28 +18,25 @@
package org.apache.ratis.hadooprpc;
import org.apache.hadoop.conf.Configuration;
-import org.apache.ratis.conf.ConfUtils;
import org.apache.ratis.conf.Parameters;
import java.net.InetSocketAddress;
-import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
-import static org.apache.ratis.conf.ConfUtils.requireMin;
+import static org.apache.ratis.conf.ConfUtils.*;
/** Hadoop Rpc specific configuration properties. */
public interface HadoopConfigKeys {
- String PREFIX = "raft.hadooprpc";
+ String PREFIX = "raft.hadoop";
- String CONF_KEY = PREFIX + ".conf";
+ String CONF_PARAMETER = PREFIX + ".conf";
+ Class<Configuration> CONF_CLASS = Configuration.class;
- static Configuration getConf(
- BiFunction<String, Class<Configuration>, Configuration> getConf) {
- return getConf.apply(CONF_KEY, Configuration.class);
+ static Configuration getConf(Parameters parameters) {
+ return parameters.get(CONF_PARAMETER, CONF_CLASS);
}
static void setConf(Parameters parameters, Configuration conf) {
- parameters.put(CONF_KEY, conf, Configuration.class);
+ parameters.put(CONF_PARAMETER, conf, Configuration.class);
}
/** IPC server configurations */
@@ -47,24 +44,28 @@ public interface HadoopConfigKeys {
String PREFIX = HadoopConfigKeys.PREFIX + ".ipc";
String ADDRESS_KEY = PREFIX + ".address";
- int DEFAULT_PORT = 10718;
- String ADDRESS_DEFAULT = "0.0.0.0:" + DEFAULT_PORT;
+ int PORT_DEFAULT = 10718;
+ String ADDRESS_DEFAULT = "0.0.0.0:" + PORT_DEFAULT;
String HANDLERS_KEY = PREFIX + ".handlers";
int HANDLERS_DEFAULT = 10;
- static int handlers(BiFunction<String, Integer, Integer> getInt) {
- return ConfUtils.getInt(getInt,
+ static int handlers(Configuration conf) {
+ return getInt(conf::getInt,
HANDLERS_KEY, HANDLERS_DEFAULT, requireMin(1));
}
- static InetSocketAddress address(BiFunction<String, String, String> getTrimmed) {
- return ConfUtils.getInetSocketAddress(getTrimmed,
+ static InetSocketAddress address(Configuration conf) {
+ return getInetSocketAddress(conf::getTrimmed,
ADDRESS_KEY, ADDRESS_DEFAULT);
}
- static void setAddress(BiConsumer<String, String> setString, String address) {
- ConfUtils.set(setString, ADDRESS_KEY, address);
+ static void setAddress(Configuration conf, String address) {
+ set(conf::set, ADDRESS_KEY, address);
}
}
+
+ static void main(String[] args) {
+ printAll(HadoopConfigKeys.class);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
index ad866c8..419bfaa 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
@@ -36,7 +36,7 @@ public class HadoopFactory extends ServerFactory.BaseFactory implements ClientFa
private final Configuration conf;
public HadoopFactory(Parameters parameters) {
- this(HadoopConfigKeys.getConf(parameters::get));
+ this(HadoopConfigKeys.getConf(parameters));
}
public HadoopFactory(Configuration conf) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
index 00d69aa..ae8bf37 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
@@ -119,8 +119,8 @@ public class HadoopRpcService implements RaftServerRpc {
private static RPC.Server newRpcServer(
RaftServerProtocol serverProtocol, final Configuration conf)
throws IOException {
- final int handlerCount = HadoopConfigKeys.Ipc.handlers(conf::getInt);
- final InetSocketAddress address = HadoopConfigKeys.Ipc.address(conf::getTrimmed);
+ final int handlerCount = HadoopConfigKeys.Ipc.handlers(conf);
+ final InetSocketAddress address = HadoopConfigKeys.Ipc.address(conf);
final BlockingService service
= RaftServerProtocolService.newReflectiveBlockingService(
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
index b3a607b..7c11f8e 100644
--- a/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
+++ b/ratis-hadoop/src/test/java/org/apache/ratis/hadooprpc/MiniRaftClusterWithHadoopRpc.java
@@ -52,8 +52,8 @@ public class MiniRaftClusterWithHadoopRpc extends MiniRaftCluster.RpcBase {
public MiniRaftClusterWithHadoopRpc newCluster(
String[] ids, RaftProperties prop, Configuration conf) {
- RaftConfigKeys.Rpc.setType(prop::set, SupportedRpcType.HADOOP);
- HadoopConfigKeys.Ipc.setAddress(conf::set, "0.0.0.0:0");
+ RaftConfigKeys.Rpc.setType(prop, SupportedRpcType.HADOOP);
+ HadoopConfigKeys.Ipc.setAddress(conf, "0.0.0.0:0");
return new MiniRaftClusterWithHadoopRpc(ids, prop, conf);
}
}
@@ -77,7 +77,7 @@ public class MiniRaftClusterWithHadoopRpc extends MiniRaftCluster.RpcBase {
RaftProperties properties) throws IOException {
final Configuration hconf = new Configuration(hadoopConf);
final String address = "0.0.0.0:" + getPort(id, conf);
- HadoopConfigKeys.Ipc.setAddress(hconf::set, address);
+ HadoopConfigKeys.Ipc.setAddress(hconf, address);
return ServerImplUtils.newRaftServer(id, stateMachine, conf, properties,
HadoopFactory.newRaftParameters(hconf));
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
index fd2703d..aa0d9f6 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
@@ -17,13 +17,9 @@
*/
package org.apache.ratis.netty;
-import org.apache.ratis.conf.ConfUtils;
+import org.apache.ratis.conf.RaftProperties;
-import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
-
-import static org.apache.ratis.conf.ConfUtils.requireMax;
-import static org.apache.ratis.conf.ConfUtils.requireMin;
+import static org.apache.ratis.conf.ConfUtils.*;
public interface NettyConfigKeys {
String PREFIX = "raft.netty";
@@ -34,17 +30,17 @@ public interface NettyConfigKeys {
String PORT_KEY = PREFIX + ".port";
int PORT_DEFAULT = 0;
- static int port(BiFunction<String, Integer, Integer> getInt) {
- return ConfUtils.getInt(getInt,
+ static int port(RaftProperties properties) {
+ return getInt(properties::getInt,
PORT_KEY, PORT_DEFAULT, requireMin(0), requireMax(65536));
}
- static void setPort(BiConsumer<String, Integer> setInt, int port) {
- ConfUtils.setInt(setInt, PORT_KEY, port);
+ static void setPort(RaftProperties properties, int port) {
+ setInt(properties::setInt, PORT_KEY, port);
}
}
static void main(String[] args) {
- ConfUtils.printAll(NettyConfigKeys.class);
+ printAll(NettyConfigKeys.class);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
index 2d927db..7487219 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
@@ -115,7 +115,7 @@ public final class NettyRpcService implements RaftServerRpc {
}
};
- final int port = NettyConfigKeys.Server.port(server.getProperties()::getInt);
+ final int port = NettyConfigKeys.Server.port(server.getProperties());
channelFuture = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
index 697aef6..126624d 100644
--- a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
+++ b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
@@ -37,7 +37,7 @@ public class MiniRaftClusterWithNetty extends MiniRaftCluster.RpcBase {
= new Factory<MiniRaftClusterWithNetty>() {
@Override
public MiniRaftClusterWithNetty newCluster(String[] ids, RaftProperties prop) {
- RaftConfigKeys.Rpc.setType(prop::set, SupportedRpcType.NETTY);
+ RaftConfigKeys.Rpc.setType(prop, SupportedRpcType.NETTY);
return new MiniRaftClusterWithNetty(ids, prop);
}
};
@@ -53,7 +53,7 @@ public class MiniRaftClusterWithNetty extends MiniRaftCluster.RpcBase {
protected RaftServerImpl newRaftServer(
RaftPeerId id, StateMachine stateMachine, RaftConfiguration conf,
RaftProperties properties) throws IOException {
- NettyConfigKeys.Server.setPort(properties::setInt, getPort(id, conf));
+ NettyConfigKeys.Server.setPort(properties, getPort(id, conf));
return ServerImplUtils.newRaftServer(id, stateMachine, conf, properties, null);
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4e5d0d79/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
index 718c277..a307101 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
@@ -42,7 +42,7 @@ public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster {
@Override
public MiniRaftClusterWithSimulatedRpc newCluster(
String[] ids, RaftProperties prop) {
- RaftConfigKeys.Rpc.setType(prop::set, SimulatedRpc.INSTANCE);
+ RaftConfigKeys.Rpc.setType(prop, SimulatedRpc.INSTANCE);
if (ThreadLocalRandom.current().nextBoolean()) {
// turn off simulate latency half of the times.
prop.setInt(SimulatedRequestReply.SIMULATE_LATENCY_KEY, 0);