You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2021/09/27 10:38:59 UTC

[ratis] branch master updated: RATIS-1407. Integer overflow when set raft.server.log.queue.limit" to 4GB by Ozone (#505)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new c7d4b6a  RATIS-1407. Integer overflow when set raft.server.log.queue.limit" to 4GB by Ozone (#505)
c7d4b6a is described below

commit c7d4b6a16009884146c839e1c0e1e27a005d9cec
Author: Sammi Chen <sa...@apache.org>
AuthorDate: Mon Sep 27 18:38:55 2021 +0800

    RATIS-1407. Integer overflow when set raft.server.log.queue.limit" to 4GB by Ozone (#505)
---
 .../org/apache/ratis/util/DataBlockingQueue.java   |  6 ++--
 .../main/java/org/apache/ratis/util/DataQueue.java | 20 +++++++-------
 .../apache/ratis/server/RaftServerConfigKeys.java  |  5 ++++
 .../apache/ratis/util/TestDataBlockingQueue.java   | 16 +++++------
 .../java/org/apache/ratis/util/TestDataQueue.java  | 32 +++++++++++-----------
 5 files changed, 42 insertions(+), 37 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java b/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
index c71dea5..d6c14ab 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java
@@ -29,7 +29,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.ToIntFunction;
+import java.util.function.ToLongFunction;
 
 /**
  * A queue for data elements
@@ -46,12 +46,12 @@ public class DataBlockingQueue<E> extends DataQueue<E> {
   private final Condition notFull  = lock.newCondition();
   private final Condition notEmpty = lock.newCondition();
 
-  public DataBlockingQueue(Object name, SizeInBytes byteLimit, int elementLimit, ToIntFunction<E> getNumBytes) {
+  public DataBlockingQueue(Object name, SizeInBytes byteLimit, int elementLimit, ToLongFunction<E> getNumBytes) {
     super(name, byteLimit, elementLimit, getNumBytes);
   }
 
   @Override
-  public int getNumBytes() {
+  public long getNumBytes() {
     try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
       return super.getNumBytes();
     }
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java b/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
index 79d1d92..c4419d4 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java
@@ -31,7 +31,7 @@ import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.function.ToIntFunction;
+import java.util.function.ToLongFunction;
 
 /**
  * A queue for data elements
@@ -45,18 +45,18 @@ public class DataQueue<E> {
   public static final Logger LOG = LoggerFactory.getLogger(DataQueue.class);
 
   private final Object name;
-  private final int byteLimit;
+  private final long byteLimit;
   private final int elementLimit;
-  private final ToIntFunction<E> getNumBytes;
+  private final ToLongFunction<E> getNumBytes;
 
   private final Queue<E> q;
 
-  private int numBytes = 0;
+  private long numBytes = 0;
 
   public DataQueue(Object name, SizeInBytes byteLimit, int elementLimit,
-      ToIntFunction<E> getNumBytes) {
+      ToLongFunction<E> getNumBytes) {
     this.name = name != null? name: this;
-    this.byteLimit = byteLimit.getSizeInt();
+    this.byteLimit = byteLimit.getSize();
     this.elementLimit = elementLimit;
     this.getNumBytes = getNumBytes;
     this.q = new ArrayDeque<>(elementLimit);
@@ -66,11 +66,11 @@ public class DataQueue<E> {
     return elementLimit;
   }
 
-  public int getByteLimit() {
+  public long getByteLimit() {
     return byteLimit;
   }
 
-  public int getNumBytes() {
+  public long getNumBytes() {
     return numBytes;
   }
 
@@ -98,7 +98,7 @@ public class DataQueue<E> {
     if (elementLimit > 0 && q.size() >= elementLimit) {
       return false;
     }
-    final int elementNumBytes = getNumBytes.applyAsInt(element);
+    final long elementNumBytes = getNumBytes.applyAsLong(element);
     Preconditions.assertTrue(elementNumBytes >= 0,
         () -> name + ": elementNumBytes = " + elementNumBytes + " < 0");
     if (byteLimit > 0) {
@@ -146,7 +146,7 @@ public class DataQueue<E> {
   public E poll() {
     final E polled = q.poll();
     if (polled != null) {
-      numBytes -= getNumBytes.applyAsInt(polled);
+      numBytes -= getNumBytes.applyAsLong(polled);
     }
     return polled;
   }
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 586f359..7748413 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -183,9 +183,14 @@ public interface RaftServerConfigKeys {
       return getSizeInBytes(properties::getSizeInBytes,
           QUEUE_BYTE_LIMIT_KEY, QUEUE_BYTE_LIMIT_DEFAULT, getDefaultLog());
     }
+
+    @Deprecated
     static void setQueueByteLimit(RaftProperties properties, int queueSize) {
       setInt(properties::setInt, QUEUE_BYTE_LIMIT_KEY, queueSize, requireMin(1));
     }
+    static void setQueueByteLimit(RaftProperties properties, SizeInBytes byteLimit) {
+      setSizeInBytes(properties::set, QUEUE_BYTE_LIMIT_KEY, byteLimit, requireMin(1L));
+    }
 
     String PURGE_GAP_KEY = PREFIX + ".purge.gap";
     int PURGE_GAP_DEFAULT = 1024;
diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestDataBlockingQueue.java b/ratis-test/src/test/java/org/apache/ratis/util/TestDataBlockingQueue.java
index 7017f97..df0dd2f 100644
--- a/ratis-test/src/test/java/org/apache/ratis/util/TestDataBlockingQueue.java
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestDataBlockingQueue.java
@@ -23,15 +23,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class TestDataBlockingQueue {
   static final Logger LOG = LoggerFactory.getLogger(TestDataBlockingQueue.class);
 
   final SizeInBytes byteLimit = SizeInBytes.valueOf(100);
   final int elementLimit = 10;
-  final DataBlockingQueue<Integer> q =
-      new DataBlockingQueue<>(null, byteLimit, elementLimit, Integer::intValue);
+  final DataBlockingQueue<Long> q =
+      new DataBlockingQueue<>(null, byteLimit, elementLimit, Long::longValue);
 
   final TimeDuration slow = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
   final TimeDuration fast = TimeDuration.valueOf(10, TimeUnit.MILLISECONDS);
@@ -56,27 +56,27 @@ public class TestDataBlockingQueue {
     runTestBlockingCalls(fast, slow, q);
   }
 
-  static void assertOfferPull(int offering, int polled, int elementLimit) {
+  static void assertOfferPull(long offering, long polled, long elementLimit) {
     Assert.assertTrue(offering >= polled);
     Assert.assertTrue(offering - polled <= elementLimit + 1);
   }
 
   static void runTestBlockingCalls(TimeDuration offerSleepTime, TimeDuration pollSleepTime,
-      DataBlockingQueue<Integer> q) throws Exception {
+      DataBlockingQueue<Long> q) throws Exception {
     Assert.assertTrue(q.isEmpty());
     ExitUtils.disableSystemExit();
     final int elementLimit = q.getElementLimit();
     final TimeDuration timeout = CollectionUtils.min(offerSleepTime, pollSleepTime);
 
-    final AtomicInteger offeringValue = new AtomicInteger();
-    final AtomicInteger polledValue = new AtomicInteger();
+    final AtomicLong offeringValue = new AtomicLong();
+    final AtomicLong polledValue = new AtomicLong();
     final int endValue = 30;
 
     final Thread pollThread = new Thread(() -> {
       try {
         for(; polledValue.get() < endValue;) {
           pollSleepTime.sleep();
-          final Integer polled = q.poll(timeout);
+          final Long polled = q.poll(timeout);
           if (polled != null) {
             Assert.assertEquals(polledValue.incrementAndGet(), polled.intValue());
             LOG.info("polled {}", polled);
diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java b/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java
index 08bc36d..846517e 100644
--- a/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestDataQueue.java
@@ -33,26 +33,26 @@ public class TestDataQueue {
     };
   }
 
-  static void assertSizes(int expectedNumElements, int expectedNumBytes, DataQueue<?> q) {
+  static void assertSizes(long expectedNumElements, long expectedNumBytes, DataQueue<?> q) {
     Assert.assertEquals(expectedNumElements, q.getNumElements());
     Assert.assertEquals(expectedNumBytes, q.getNumBytes());
   }
 
   final SizeInBytes byteLimit = SizeInBytes.valueOf(100);
   final int elementLimit = 5;
-  final DataQueue<Integer> q = new DataQueue<>(null, byteLimit, elementLimit, Integer::intValue);
+  final DataQueue<Long> q = new DataQueue<Long>(null, byteLimit, elementLimit, Long::longValue);
 
   @Test(timeout = 1000)
   public void testElementLimit() {
     runTestElementLimit(q);
   }
 
-  static void runTestElementLimit(DataQueue<Integer> q) {
+  static void runTestElementLimit(DataQueue<Long> q) {
     assertSizes(0, 0, q);
 
     final int elementLimit = q.getElementLimit();
     int numBytes = 0;
-    for (int i = 0; i < elementLimit; i++) {
+    for (long i = 0; i < elementLimit; i++) {
       Assert.assertEquals(i, q.getNumElements());
       Assert.assertEquals(numBytes, q.getNumBytes());
       final boolean offered = q.offer(i);
@@ -61,13 +61,13 @@ public class TestDataQueue {
       assertSizes(i+1, numBytes, q);
     }
     {
-      final boolean offered = q.offer(0);
+      final boolean offered = q.offer(0L);
       Assert.assertFalse(offered);
       assertSizes(elementLimit, numBytes, q);
     }
 
     { // poll all elements
-      final List<Integer> polled = q.pollList(100, (i, timeout) -> i, getTimeoutHandler(false));
+      final List<Long> polled = q.pollList(100, (i, timeout) -> i, getTimeoutHandler(false));
       Assert.assertEquals(elementLimit, polled.size());
       for (int i = 0; i < polled.size(); i++) {
         Assert.assertEquals(i, polled.get(i).intValue());
@@ -81,17 +81,17 @@ public class TestDataQueue {
     runTestByteLimit(q);
   }
 
-  static void runTestByteLimit(DataQueue<Integer> q) {
+  static void runTestByteLimit(DataQueue<Long> q) {
     assertSizes(0, 0, q);
 
-    final int byteLimit = q.getByteLimit();
+    final long byteLimit = q.getByteLimit();
     try {
       q.offer(byteLimit + 1);
       Assert.fail();
     } catch (IllegalStateException ignored) {
     }
 
-    final int halfBytes = byteLimit / 2;
+    final long halfBytes = byteLimit / 2;
     {
       final boolean offered = q.offer(halfBytes);
       Assert.assertTrue(offered);
@@ -111,19 +111,19 @@ public class TestDataQueue {
     }
 
     {
-      final boolean offered = q.offer(1);
+      final boolean offered = q.offer(1L);
       Assert.assertFalse(offered);
       assertSizes(2, byteLimit, q);
     }
 
     {
-      final boolean offered = q.offer(0);
+      final boolean offered = q.offer(0L);
       Assert.assertTrue(offered);
       assertSizes(3, byteLimit, q);
     }
 
     { // poll all elements
-      final List<Integer> polled = q.pollList(100, (i, timeout) -> i, getTimeoutHandler(false));
+      final List<Long> polled = q.pollList(100, (i, timeout) -> i, getTimeoutHandler(false));
       Assert.assertEquals(3, polled.size());
       Assert.assertEquals(halfBytes, polled.get(0).intValue());
       Assert.assertEquals(halfBytes, polled.get(1).intValue());
@@ -138,7 +138,7 @@ public class TestDataQueue {
     assertSizes(0, 0, q);
 
     int numBytes = 0;
-    for (int i = 0; i < elementLimit; i++) {
+    for (long i = 0; i < elementLimit; i++) {
       Assert.assertEquals(i, q.getNumElements());
       Assert.assertEquals(numBytes, q.getNumBytes());
       final boolean offered = q.offer(i);
@@ -148,14 +148,14 @@ public class TestDataQueue {
     }
 
     { // poll with zero time
-      final List<Integer> polled = q.pollList(0, (i, timeout) -> i, getTimeoutHandler(false));
+      final List<Long> polled = q.pollList(0, (i, timeout) -> i, getTimeoutHandler(false));
       Assert.assertTrue(polled.isEmpty());
       assertSizes(elementLimit, numBytes, q);
     }
 
     final int halfElements = elementLimit / 2;
     { // poll with timeout
-      final List<Integer> polled = q.pollList(100, (i, timeout) -> {
+      final List<Long> polled = q.pollList(100, (i, timeout) -> {
         if (i == halfElements) {
           // simulate timeout
           throw new TimeoutException("i=" + i);
@@ -171,7 +171,7 @@ public class TestDataQueue {
     }
 
     { // poll the remaining elements
-      final List<Integer> polled = q.pollList(100, (i, timeout) -> i, getTimeoutHandler(false));
+      final List<Long> polled = q.pollList(100, (i, timeout) -> i, getTimeoutHandler(false));
       Assert.assertEquals(elementLimit - halfElements, polled.size());
       for (int i = 0; i < polled.size(); i++) {
         Assert.assertEquals(halfElements + i, polled.get(i).intValue());