You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2021/07/12 12:59:44 UTC
[ratis] branch master updated: RATIS-1384.Change pending request
limit unit to MB (#483)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 3f4f5ff RATIS-1384.Change pending request limit unit to MB (#483)
3f4f5ff is described below
commit 3f4f5ffc6d34ac906d71feea0d9c6f845663e138
Author: Sadanand Shenoy <sa...@gmail.com>
AuthorDate: Mon Jul 12 17:23:15 2021 +0530
RATIS-1384.Change pending request limit unit to MB (#483)
---
.../main/java/org/apache/ratis/conf/ConfUtils.java | 9 ++++
.../java/org/apache/ratis/util/SizeInBytes.java | 1 +
.../apache/ratis/server/RaftServerConfigKeys.java | 2 +-
.../apache/ratis/server/impl/PendingRequests.java | 63 ++++++++++++++++------
.../server/metrics/RaftServerMetricsImpl.java | 8 +--
.../apache/ratis/grpc/TestRaftServerWithGrpc.java | 14 ++---
.../ratis/server/TestRaftServerConfigKeys.java | 20 +++++++
7 files changed, 90 insertions(+), 27 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java b/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
index b662dff..989070f 100644
--- a/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
@@ -81,6 +81,15 @@ public interface ConfUtils {
};
}
+ static BiConsumer<String, SizeInBytes> requireMinSizeInByte(SizeInBytes min) {
+ return (key, value) -> {
+ if (value.getSize() < min.getSize()) {
+ throw new IllegalArgumentException(
+ key + " = " + value + " < min = " + min);
+ }
+ };
+ }
+
static BiConsumer<String, Long> requireMax(long max) {
return (key, value) -> {
if (value > max) {
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java b/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java
index 160cf44..25667a3 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SizeInBytes.java
@@ -24,6 +24,7 @@ import java.util.Objects;
*/
public final class SizeInBytes {
public static final SizeInBytes ONE_KB = valueOf("1k");
+ public static final SizeInBytes ONE_MB = valueOf("1m");
public static SizeInBytes valueOf(long size) {
final String s = String.valueOf(size);
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 892cc16..586f359 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -114,7 +114,7 @@ public interface RaftServerConfigKeys {
SizeInBytes BYTE_LIMIT_DEFAULT = SizeInBytes.valueOf("64MB");
static SizeInBytes byteLimit(RaftProperties properties) {
return getSizeInBytes(properties::getSizeInBytes,
- BYTE_LIMIT_KEY, BYTE_LIMIT_DEFAULT, getDefaultLog());
+ BYTE_LIMIT_KEY, BYTE_LIMIT_DEFAULT, getDefaultLog(), requireMinSizeInByte(SizeInBytes.ONE_MB));
}
static void setByteLimit(RaftProperties properties, SizeInBytes byteLimit) {
setSizeInBytes(properties::set, BYTE_LIMIT_KEY, byteLimit, requireMin(1L));
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index cda61df..0812c29 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -44,32 +44,46 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
class PendingRequests {
public static final Logger LOG = LoggerFactory.getLogger(PendingRequests.class);
+ private static final int ONE_MB = SizeInBytes.ONE_MB.getSizeInt();
+
+ /**
+ * Round up to the nearest MB.
+ */
+ static int roundUpMb(long bytes) {
+ return Math.toIntExact((bytes - 1) / ONE_MB + 1);
+ }
+
static class Permit {}
static class RequestLimits extends ResourceSemaphore.Group {
- RequestLimits(int elementLimit, SizeInBytes byteLimit) {
- super(elementLimit, byteLimit.getSizeInt());
+ RequestLimits(int elementLimit, int megabyteLimit) {
+ super(elementLimit, megabyteLimit);
}
int getElementCount() {
return get(0).used();
}
- int getByteSize() {
+ int getMegaByteSize() {
return get(1).used();
}
- ResourceSemaphore.ResourceAcquireStatus tryAcquire(Message message) {
- return tryAcquire(1, Message.getSize(message));
+ ResourceSemaphore.ResourceAcquireStatus tryAcquire(int messageSizeMb) {
+ return tryAcquire(1, messageSizeMb);
}
- void release(Message message) {
- release(1, Message.getSize(message));
+ void releaseExtraMb(int extraMb) {
+ release(0, extraMb);
+ }
+
+ void release(int diffMb) {
+ release(1, diffMb);
}
}
@@ -82,19 +96,24 @@ class PendingRequests {
private final Map<Permit, Permit> permits = new HashMap<>();
/** Track and limit the number of requests and the total message size. */
private final RequestLimits resource;
+ /** The size (in byte) of all the requests in this map. */
+ private final AtomicLong requestSize = new AtomicLong();
+
- RequestMap(Object name, int elementLimit, SizeInBytes byteLimit, RaftServerMetricsImpl raftServerMetrics) {
+ RequestMap(Object name, int elementLimit, int megabyteLimit, RaftServerMetricsImpl raftServerMetrics) {
this.name = name;
- this.resource = new RequestLimits(elementLimit, byteLimit);
+ this.resource = new RequestLimits(elementLimit, megabyteLimit);
this.raftServerMetrics = raftServerMetrics;
raftServerMetrics.addNumPendingRequestsGauge(resource::getElementCount);
- raftServerMetrics.addNumPendingRequestsByteSize(resource::getByteSize);
+ raftServerMetrics.addNumPendingRequestsMegaByteSize(resource::getMegaByteSize);
}
Permit tryAcquire(Message message) {
- final ResourceSemaphore.ResourceAcquireStatus acquired = resource.tryAcquire(message);
- LOG.trace("tryAcquire? {}", acquired);
+ final int messageSize = Message.getSize(message);
+ final int messageSizeMb = roundUpMb(messageSize );
+ final ResourceSemaphore.ResourceAcquireStatus acquired = resource.tryAcquire(messageSizeMb);
+ LOG.trace("tryAcquire {} MB? {}", messageSizeMb, acquired);
if (acquired == ResourceSemaphore.ResourceAcquireStatus.FAILED_IN_ELEMENT_LIMIT) {
raftServerMetrics.onRequestQueueLimitHit();
raftServerMetrics.onResourceLimitHit();
@@ -104,6 +123,14 @@ class PendingRequests {
raftServerMetrics.onResourceLimitHit();
return null;
}
+
+ // release extra MB
+ final long oldSize = requestSize.getAndAdd(messageSize);
+ final long newSize = oldSize + messageSize;
+ final int diffMb = roundUpMb(newSize) - roundUpMb(oldSize);
+ if (messageSizeMb > diffMb) {
+ resource.releaseExtraMb(messageSizeMb - diffMb);
+ }
return putPermit();
}
@@ -140,8 +167,12 @@ class PendingRequests {
if (r == null) {
return null;
}
- resource.release(r.getRequest().getMessage());
- LOG.trace("release");
+ final int messageSize = Message.getSize(r.getRequest().getMessage());
+ final long oldSize = requestSize.getAndAdd(-messageSize);
+ final long newSize = oldSize - messageSize;
+ final int diffMb = roundUpMb(oldSize) - roundUpMb(newSize);
+ resource.release(diffMb);
+ LOG.trace("release {} MB", diffMb);
return r;
}
@@ -183,7 +214,9 @@ class PendingRequests {
this.name = id + "-" + JavaUtils.getClassSimpleName(getClass());
this.pendingRequests = new RequestMap(id,
RaftServerConfigKeys.Write.elementLimit(properties),
- RaftServerConfigKeys.Write.byteLimit(properties),
+ Math.toIntExact(
+ RaftServerConfigKeys.Write.byteLimit(properties).getSize()
+ / SizeInBytes.ONE_MB.getSize()), //round down
raftServerMetrics);
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
index 28f2950..4a0a9f3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetricsImpl.java
@@ -61,7 +61,7 @@ public final class RaftServerMetricsImpl extends RatisMetrics implements RaftSer
public static final String RESOURCE_LIMIT_HIT_COUNTER = "leaderNumResourceLimitHits";
public static final String REQUEST_BYTE_SIZE_LIMIT_HIT_COUNTER = "numRequestsByteSizeLimitHits";
public static final String REQUEST_QUEUE_SIZE = "numPendingRequestInQueue";
- public static final String REQUEST_BYTE_SIZE = "numPendingRequestByteSize";
+ public static final String REQUEST_MEGA_BYTE_SIZE = "numPendingRequestMegaByteSize";
public static final String RETRY_CACHE_ENTRY_COUNT_METRIC = "retryCacheEntryCount";
public static final String RETRY_CACHE_HIT_COUNT_METRIC = "retryCacheHitCount";
public static final String RETRY_CACHE_HIT_RATE_METRIC = "retryCacheHitRate";
@@ -221,12 +221,12 @@ public final class RaftServerMetricsImpl extends RatisMetrics implements RaftSer
return registry.remove(REQUEST_QUEUE_SIZE);
}
- public void addNumPendingRequestsByteSize(Gauge byteSize) {
- registry.gauge(REQUEST_BYTE_SIZE, () -> byteSize);
+ public void addNumPendingRequestsMegaByteSize(Gauge megabyteSize) {
+ registry.gauge(REQUEST_MEGA_BYTE_SIZE, () -> megabyteSize);
}
public boolean removeNumPendingRequestsByteSize() {
- return registry.remove(REQUEST_BYTE_SIZE);
+ return registry.remove(REQUEST_MEGA_BYTE_SIZE);
}
public void onRequestByteSizeLimitHit() {
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index b170309..71783e8 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -22,7 +22,7 @@ import static org.apache.ratis.server.metrics.RaftServerMetricsImpl.RAFT_CLIENT_
import static org.apache.ratis.server.metrics.RaftServerMetricsImpl.RAFT_CLIENT_WATCH_REQUEST;
import static org.apache.ratis.server.metrics.RaftServerMetricsImpl.RAFT_CLIENT_WRITE_REQUEST;
import static org.apache.ratis.server.metrics.RaftServerMetricsImpl.REQUEST_QUEUE_LIMIT_HIT_COUNTER;
-import static org.apache.ratis.server.metrics.RaftServerMetricsImpl.REQUEST_BYTE_SIZE;
+import static org.apache.ratis.server.metrics.RaftServerMetricsImpl.REQUEST_MEGA_BYTE_SIZE;
import static org.apache.ratis.server.metrics.RaftServerMetricsImpl.REQUEST_BYTE_SIZE_LIMIT_HIT_COUNTER;
import static org.apache.ratis.server.metrics.RaftServerMetricsImpl.RESOURCE_LIMIT_HIT_COUNTER;
@@ -210,7 +210,7 @@ public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterW
public void testRaftServerMetrics() throws Exception {
final RaftProperties p = getProperties();
RaftServerConfigKeys.Write.setElementLimit(p, 10);
- RaftServerConfigKeys.Write.setByteLimit(p, SizeInBytes.valueOf(110));
+ RaftServerConfigKeys.Write.setByteLimit(p, SizeInBytes.valueOf("1MB"));
try {
runWithNewCluster(3, this::testRequestMetrics);
} finally {
@@ -242,10 +242,9 @@ public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterW
final SortedMap<String, Gauge> gaugeMap = getRaftServerMetrics(cluster.getLeader())
- .getRegistry().getGauges((s, metric) -> s.contains(REQUEST_BYTE_SIZE));
+ .getRegistry().getGauges((s, metric) -> s.contains(
+ REQUEST_MEGA_BYTE_SIZE));
- RaftTestUtil.waitFor(() -> (int) gaugeMap.get(gaugeMap.firstKey()).getValue() == message.length(),
- 300, 5000);
for (int i = 0; i < 10; i++) {
client = cluster.createClient(cluster.getLeader().getId(), RetryPolicies.noRetry());
@@ -259,11 +258,12 @@ public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterW
stateMachine.unblockFlushStateMachineData();
- // Send a message with 120, our byte size limit is 110, so it should fail
+ // Send a message with 1025kb , our byte size limit is 1024kb (1mb) , so it should fail
// and byte size counter limit will be hit.
client = cluster.createClient(cluster.getLeader().getId(), RetryPolicies.noRetry());
- client.async().send(new SimpleMessage(RandomStringUtils.random(120, true, false)));
+ client.async().send(new SimpleMessage(RandomStringUtils
+ .random(SizeInBytes.valueOf("1025kb").getSizeInt(), true, false)));
clients.add(client);
RaftTestUtil.waitFor(() -> getRaftServerMetrics(cluster.getLeader())
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/TestRaftServerConfigKeys.java b/ratis-test/src/test/java/org/apache/ratis/server/TestRaftServerConfigKeys.java
index 0b56bef..bb386e8 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/TestRaftServerConfigKeys.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/TestRaftServerConfigKeys.java
@@ -22,6 +22,7 @@ import org.apache.ratis.BaseTest;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.SizeInBytes;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
@@ -36,6 +37,10 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.apache.ratis.conf.ConfUtils.requireMin;
+import static org.apache.ratis.conf.ConfUtils.setSizeInBytes;
+import static org.apache.ratis.server.RaftServerConfigKeys.Write.BYTE_LIMIT_KEY;
+
/**
* Test cases to verify RaftServerConfigKeys.
*/
@@ -95,4 +100,19 @@ public class TestRaftServerConfigKeys {
Assert.assertEquals(directories.size(), storageDirs.size());
Assert.assertEquals(0, actualDirs.size());
}
+
+ /**
+ * Sets the value to <code>raft.server.write.byte-limit</code> via
+ * RaftServerConfigKeys and also verifies the same via RaftServerConfigKeys.
+ */
+ @Test public void testPendingRequestSize() {
+ RaftProperties properties = new RaftProperties();
+ // setting to 4GB
+ setSizeInBytes(properties::set, BYTE_LIMIT_KEY, SizeInBytes.valueOf("4gb"),
+ requireMin(1L));
+ int pendingRequestMegabyteLimit = Math.toIntExact(
+ RaftServerConfigKeys.Write.byteLimit(properties).getSize()
+ / SizeInBytes.ONE_MB.getSize());
+ Assert.assertEquals(4096, pendingRequestMegabyteLimit);
+ }
}
\ No newline at end of file