You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2021/09/27 10:38:59 UTC
[ratis] branch master updated: RATIS-1407. Integer overflow when
set raft.server.log.queue.limit" to 4GB by Ozone (#505)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new c7d4b6a RATIS-1407. Integer overflow when set raft.server.log.queue.limit" to 4GB by Ozone (#505)
c7d4b6a is described below
commit c7d4b6a16009884146c839e1c0e1e27a005d9cec
Author: Sammi Chen <sa...@apache.org>
AuthorDate: Mon Sep 27 18:38:55 2021 +0800
RATIS-1407. Integer overflow when set raft.server.log.queue.limit" to 4GB by Ozone (#505)
---
.../org/apache/ratis/util/DataBlockingQueue.java | 6 ++--
.../main/java/org/apache/ratis/util/DataQueue.java | 20 +++++++-------
.../apache/ratis/server/RaftServerConfigKeys.java | 5 ++++
.../apache/ratis/util/TestDataBlockingQueue.java | 16 +++++------
.../java/org/apache/ratis/util/TestDataQueue.java | 32 +++++++++++-----------
5 files changed, 42 insertions(+), 37 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java b/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
index c71dea5..d6c14ab 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
@@ -29,7 +29,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.ToIntFunction;
+import java.util.function.ToLongFunction;
/**
* A queue for data elements
@@ -46,12 +46,12 @@ public class DataBlockingQueue<E> extends DataQueue<E> {
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
- public DataBlockingQueue(Object name, SizeInBytes byteLimit, int elementLimit, ToIntFunction<E> getNumBytes) {
+ public DataBlockingQueue(Object name, SizeInBytes byteLimit, int elementLimit, ToLongFunction<E> getNumBytes) {
super(name, byteLimit, elementLimit, getNumBytes);
}
@Override
- public int getNumBytes() {
+ public long getNumBytes() {
try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
return super.getNumBytes();
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java b/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
index 79d1d92..c4419d4 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
@@ -31,7 +31,7 @@ import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.function.ToIntFunction;
+import java.util.function.ToLongFunction;
/**
* A queue for data elements
@@ -45,18 +45,18 @@ public class DataQueue<E> {
public static final Logger LOG = LoggerFactory.getLogger(DataQueue.class);
private final Object name;
- private final int byteLimit;
+ private final long byteLimit;
private final int elementLimit;
- private final ToIntFunction<E> getNumBytes;
+ private final ToLongFunction<E> getNumBytes;
private final Queue<E> q;
- private int numBytes = 0;
+ private long numBytes = 0;
public DataQueue(Object name, SizeInBytes byteLimit, int elementLimit,
- ToIntFunction<E> getNumBytes) {
+ ToLongFunction<E> getNumBytes) {
this.name = name != null? name: this;
- this.byteLimit = byteLimit.getSizeInt();
+ this.byteLimit = byteLimit.getSize();
this.elementLimit = elementLimit;
this.getNumBytes = getNumBytes;
this.q = new ArrayDeque<>(elementLimit);
@@ -66,11 +66,11 @@ public class DataQueue<E> {
return elementLimit;
}
- public int getByteLimit() {
+ public long getByteLimit() {
return byteLimit;
}
- public int getNumBytes() {
+ public long getNumBytes() {
return numBytes;
}
@@ -98,7 +98,7 @@ public class DataQueue<E> {
if (elementLimit > 0 && q.size() >= elementLimit) {
return false;
}
- final int elementNumBytes = getNumBytes.applyAsInt(element);
+ final long elementNumBytes = getNumBytes.applyAsLong(element);
Preconditions.assertTrue(elementNumBytes >= 0,
() -> name + ": elementNumBytes = " + elementNumBytes + " < 0");
if (byteLimit > 0) {
@@ -146,7 +146,7 @@ public class DataQueue<E> {
public E poll() {
final E polled = q.poll();
if (polled != null) {
- numBytes -= getNumBytes.applyAsInt(polled);
+ numBytes -= getNumBytes.applyAsLong(polled);
}
return polled;
}
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 586f359..7748413 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -183,9 +183,14 @@ public interface RaftServerConfigKeys {
return getSizeInBytes(properties::getSizeInBytes,
QUEUE_BYTE_LIMIT_KEY, QUEUE_BYTE_LIMIT_DEFAULT, getDefaultLog());
}
+
+ @Deprecated
static void setQueueByteLimit(RaftProperties properties, int queueSize) {
setInt(properties::setInt, QUEUE_BYTE_LIMIT_KEY, queueSize, requireMin(1));
}
+ static void setQueueByteLimit(RaftProperties properties, SizeInBytes byteLimit) {
+ setSizeInBytes(properties::set, QUEUE_BYTE_LIMIT_KEY, byteLimit, requireMin(1L));
+ }
String PURGE_GAP_KEY = PREFIX + ".purge.gap";
int PURGE_GAP_DEFAULT = 1024;
diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestDataBlockingQueue.java b/ratis-test/src/test/java/org/apache/ratis/util/TestDataBlockingQueue.java
index 7017f97..df0dd2f 100644
--- a/ratis-test/src/test/java/org/apache/ratis/util/TestDataBlockingQueue.java
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestDataBlockingQueue.java
@@ -23,15 +23,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
public class TestDataBlockingQueue {
static final Logger LOG = LoggerFactory.getLogger(TestDataBlockingQueue.class);
final SizeInBytes byteLimit = SizeInBytes.valueOf(100);
final int elementLimit = 10;
- final DataBlockingQueue<Integer> q =
- new DataBlockingQueue<>(null, byteLimit, elementLimit, Integer::intValue);
+ final DataBlockingQueue<Long> q =
+ new DataBlockingQueue<>(null, byteLimit, elementLimit, Long::longValue);
final TimeDuration slow = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
final TimeDuration fast = TimeDuration.valueOf(10, TimeUnit.MILLISECONDS);
@@ -56,27 +56,27 @@ public class TestDataBlockingQueue {
runTestBlockingCalls(fast, slow, q);
}
- static void assertOfferPull(int offering, int polled, int elementLimit) {
+ static void assertOfferPull(long offering, long polled, long elementLimit) {
Assert.assertTrue(offering >= polled);
Assert.assertTrue(offering - polled <= elementLimit + 1);
}
static void runTestBlockingCalls(TimeDuration offerSleepTime, TimeDuration pollSleepTime,
- DataBlockingQueue<Integer> q) throws Exception {
+ DataBlockingQueue<Long> q) throws Exception {
Assert.assertTrue(q.isEmpty());
ExitUtils.disableSystemExit();
final int elementLimit = q.getElementLimit();
final TimeDuration timeout = CollectionUtils.min(offerSleepTime, pollSleepTime);
- final AtomicInteger offeringValue = new AtomicInteger();
- final AtomicInteger polledValue = new AtomicInteger();
+ final AtomicLong offeringValue = new AtomicLong();
+ final AtomicLong polledValue = new AtomicLong();
final int endValue = 30;
final Thread pollThread = new Thread(() -> {
try {
for(; polledValue.get() < endValue;) {
pollSleepTime.sleep();
- final Integer polled = q.poll(timeout);
+ final Long polled = q.poll(timeout);
if (polled != null) {
Assert.assertEquals(polledValue.incrementAndGet(), polled.intValue());
LOG.info("polled {}", polled);
diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java b/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java
index 08bc36d..846517e 100644
--- a/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java
@@ -33,26 +33,26 @@ public class TestDataQueue {
};
}
- static void assertSizes(int expectedNumElements, int expectedNumBytes, DataQueue<?> q) {
+ static void assertSizes(long expectedNumElements, long expectedNumBytes, DataQueue<?> q) {
Assert.assertEquals(expectedNumElements, q.getNumElements());
Assert.assertEquals(expectedNumBytes, q.getNumBytes());
}
final SizeInBytes byteLimit = SizeInBytes.valueOf(100);
final int elementLimit = 5;
- final DataQueue<Integer> q = new DataQueue<>(null, byteLimit, elementLimit, Integer::intValue);
+ final DataQueue<Long> q = new DataQueue<Long>(null, byteLimit, elementLimit, Long::longValue);
@Test(timeout = 1000)
public void testElementLimit() {
runTestElementLimit(q);
}
- static void runTestElementLimit(DataQueue<Integer> q) {
+ static void runTestElementLimit(DataQueue<Long> q) {
assertSizes(0, 0, q);
final int elementLimit = q.getElementLimit();
int numBytes = 0;
- for (int i = 0; i < elementLimit; i++) {
+ for (long i = 0; i < elementLimit; i++) {
Assert.assertEquals(i, q.getNumElements());
Assert.assertEquals(numBytes, q.getNumBytes());
final boolean offered = q.offer(i);
@@ -61,13 +61,13 @@ public class TestDataQueue {
assertSizes(i+1, numBytes, q);
}
{
- final boolean offered = q.offer(0);
+ final boolean offered = q.offer(0L);
Assert.assertFalse(offered);
assertSizes(elementLimit, numBytes, q);
}
{ // poll all elements
- final List<Integer> polled = q.pollList(100, (i, timeout) -> i, getTimeoutHandler(false));
+ final List<Long> polled = q.pollList(100, (i, timeout) -> i, getTimeoutHandler(false));
Assert.assertEquals(elementLimit, polled.size());
for (int i = 0; i < polled.size(); i++) {
Assert.assertEquals(i, polled.get(i).intValue());
@@ -81,17 +81,17 @@ public class TestDataQueue {
runTestByteLimit(q);
}
- static void runTestByteLimit(DataQueue<Integer> q) {
+ static void runTestByteLimit(DataQueue<Long> q) {
assertSizes(0, 0, q);
- final int byteLimit = q.getByteLimit();
+ final long byteLimit = q.getByteLimit();
try {
q.offer(byteLimit + 1);
Assert.fail();
} catch (IllegalStateException ignored) {
}
- final int halfBytes = byteLimit / 2;
+ final long halfBytes = byteLimit / 2;
{
final boolean offered = q.offer(halfBytes);
Assert.assertTrue(offered);
@@ -111,19 +111,19 @@ public class TestDataQueue {
}
{
- final boolean offered = q.offer(1);
+ final boolean offered = q.offer(1L);
Assert.assertFalse(offered);
assertSizes(2, byteLimit, q);
}
{
- final boolean offered = q.offer(0);
+ final boolean offered = q.offer(0L);
Assert.assertTrue(offered);
assertSizes(3, byteLimit, q);
}
{ // poll all elements
- final List<Integer> polled = q.pollList(100, (i, timeout) -> i, getTimeoutHandler(false));
+ final List<Long> polled = q.pollList(100, (i, timeout) -> i, getTimeoutHandler(false));
Assert.assertEquals(3, polled.size());
Assert.assertEquals(halfBytes, polled.get(0).intValue());
Assert.assertEquals(halfBytes, polled.get(1).intValue());
@@ -138,7 +138,7 @@ public class TestDataQueue {
assertSizes(0, 0, q);
int numBytes = 0;
- for (int i = 0; i < elementLimit; i++) {
+ for (long i = 0; i < elementLimit; i++) {
Assert.assertEquals(i, q.getNumElements());
Assert.assertEquals(numBytes, q.getNumBytes());
final boolean offered = q.offer(i);
@@ -148,14 +148,14 @@ public class TestDataQueue {
}
{ // poll with zero time
- final List<Integer> polled = q.pollList(0, (i, timeout) -> i, getTimeoutHandler(false));
+ final List<Long> polled = q.pollList(0, (i, timeout) -> i, getTimeoutHandler(false));
Assert.assertTrue(polled.isEmpty());
assertSizes(elementLimit, numBytes, q);
}
final int halfElements = elementLimit / 2;
{ // poll with timeout
- final List<Integer> polled = q.pollList(100, (i, timeout) -> {
+ final List<Long> polled = q.pollList(100, (i, timeout) -> {
if (i == halfElements) {
// simulate timeout
throw new TimeoutException("i=" + i);
@@ -171,7 +171,7 @@ public class TestDataQueue {
}
{ // poll the remaining elements
- final List<Integer> polled = q.pollList(100, (i, timeout) -> i, getTimeoutHandler(false));
+ final List<Long> polled = q.pollList(100, (i, timeout) -> i, getTimeoutHandler(false));
Assert.assertEquals(elementLimit - halfElements, polled.size());
for (int i = 0; i < polled.size(); i++) {
Assert.assertEquals(halfElements + i, polled.get(i).intValue());