You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bb...@apache.org on 2024/03/07 02:30:09 UTC
(hbase) branch branch-2 updated: HBASE-28359 Improve quota RateLimiter synchronization (#5683)
This is an automated email from the ASF dual-hosted git repository.
bbeaudreault pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new c7eb3d848e3 HBASE-28359 Improve quota RateLimiter synchronization (#5683)
c7eb3d848e3 is described below
commit c7eb3d848e3907aa12ca65bba3b1f83554d0facd
Author: Ray Mattingly <rm...@gmail.com>
AuthorDate: Wed Mar 6 20:06:29 2024 -0500
HBASE-28359 Improve quota RateLimiter synchronization (#5683)
Signed-off-by: Bryan Beaudreault <bb...@apache.org>
Signed-off-by: Duo Zhang <zh...@apache.org>
---
.../hadoop/hbase/quotas/DefaultOperationQuota.java | 5 ++
.../hadoop/hbase/quotas/NoopOperationQuota.java | 5 ++
.../apache/hadoop/hbase/quotas/OperationQuota.java | 11 +++
.../apache/hadoop/hbase/quotas/RateLimiter.java | 31 +++++---
.../hadoop/hbase/quotas/TimeBasedLimiter.java | 52 +++++++-------
.../hadoop/hbase/regionserver/RSRpcServices.java | 4 +-
.../hbase/quotas/TestBlockBytesScannedQuota.java | 22 +++---
.../hadoop/hbase/quotas/TestRateLimiter.java | 83 +++++++++++-----------
8 files changed, 124 insertions(+), 89 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
index 4b89e18a802..a4ff8b2a859 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
@@ -134,6 +134,11 @@ public class DefaultOperationQuota implements OperationQuota {
return readAvailable;
}
+ @Override
+ public long getReadConsumed() {
+ return readConsumed;
+ }
+
@Override
public void addGetResult(final Result result) {
operationSize[OperationType.GET.ordinal()] += QuotaUtil.calculateResultSize(result);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
index 71fc169d671..b64429d9adc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
@@ -68,4 +68,9 @@ class NoopOperationQuota implements OperationQuota {
public long getReadAvailable() {
return Long.MAX_VALUE;
}
+
+ @Override
+ public long getReadConsumed() {
+ return 0L;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
index ffc3cd50825..bedad5e9867 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
@@ -80,4 +80,15 @@ public interface OperationQuota {
/** Returns the number of bytes available to read to avoid exceeding the quota */
long getReadAvailable();
+
+ /** Returns the number of bytes consumed from the quota by the operation */
+ long getReadConsumed();
+
+ /**
+ * Returns the maximum result size to be returned by the given operation. This is the greater of
+ * two numbers: the bytes available, or the bytes already consumed
+ */
+ default long getMaxResultSize() {
+ return Math.max(getReadAvailable(), getReadConsumed());
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
index bda60ffa690..5c69ad5d6cd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
@@ -23,12 +23,10 @@ import org.apache.yetus.audience.InterfaceStability;
/**
* Simple rate limiter. Usage Example: // At this point you have a unlimited resource limiter
- * RateLimiter limiter = new AverageIntervalRateLimiter(); or new FixedIntervalRateLimiter();
- * limiter.set(10, TimeUnit.SECONDS); // set 10 resources/sec while (true) { // call canExecute
- * before performing resource consuming operation bool canExecute = limiter.canExecute(); // If
- * there are no available resources, wait until one is available if (!canExecute)
- * Thread.sleep(limiter.waitInterval()); // ...execute the work and consume the resource...
- * limiter.consume(); }
+ * RateLimiter limiter = new AverageIntervalRateLimiter(); // or new FixedIntervalRateLimiter();
+ * limiter.set(10, TimeUnit.SECONDS); // set 10 resources/sec while (limiter.getWaitIntervalMs > 0)
+ * { // wait until waitInterval == 0 Thread.sleep(limiter.getWaitIntervalMs()); } // ...execute the
+ * work and consume the resource... limiter.consume();
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
@@ -135,10 +133,23 @@ public abstract class RateLimiter {
/**
* Is there at least one resource available to allow execution?
- * @return true if there is at least one resource available, otherwise false
+ * @return the waitInterval to backoff, or 0 if execution is allowed
*/
- public boolean canExecute() {
- return canExecute(1);
+ public long getWaitIntervalMs() {
+ return getWaitIntervalMs(1);
+ }
+
+ /**
+ * Are there enough available resources to allow execution?
+ * @param amount the number of required resources, a non-negative number
+ * @return the waitInterval to backoff, or 0 if execution is allowed
+ */
+ public synchronized long getWaitIntervalMs(final long amount) {
+ assert amount >= 0;
+ if (!isAvailable(amount)) {
+ return waitInterval(amount);
+ }
+ return 0;
}
/**
@@ -146,7 +157,7 @@ public abstract class RateLimiter {
* @param amount the number of required resources, a non-negative number
* @return true if there are enough available resources, otherwise false
*/
- public synchronized boolean canExecute(final long amount) {
+ private boolean isAvailable(final long amount) {
if (isBypass()) {
return true;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
index d7eb0e537a3..8ae2cae0188 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
@@ -141,43 +141,47 @@ public class TimeBasedLimiter implements QuotaLimiter {
public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs,
long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit)
throws RpcThrottlingException {
- if (!reqsLimiter.canExecute(writeReqs + readReqs)) {
- RpcThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval());
+ long waitInterval = reqsLimiter.getWaitIntervalMs(writeReqs + readReqs);
+ if (waitInterval > 0) {
+ RpcThrottlingException.throwNumRequestsExceeded(waitInterval);
}
- if (!reqSizeLimiter.canExecute(estimateWriteSize + estimateReadSize)) {
- RpcThrottlingException.throwRequestSizeExceeded(
- reqSizeLimiter.waitInterval(estimateWriteSize + estimateReadSize));
+ waitInterval = reqSizeLimiter.getWaitIntervalMs(estimateWriteSize + estimateReadSize);
+ if (waitInterval > 0) {
+ RpcThrottlingException.throwRequestSizeExceeded(waitInterval);
}
- if (!reqCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit + estimateReadCapacityUnit)) {
- RpcThrottlingException.throwRequestCapacityUnitExceeded(
- reqCapacityUnitLimiter.waitInterval(estimateWriteCapacityUnit + estimateReadCapacityUnit));
+ waitInterval = reqCapacityUnitLimiter
+ .getWaitIntervalMs(estimateWriteCapacityUnit + estimateReadCapacityUnit);
+ if (waitInterval > 0) {
+ RpcThrottlingException.throwRequestCapacityUnitExceeded(waitInterval);
}
if (estimateWriteSize > 0) {
- if (!writeReqsLimiter.canExecute(writeReqs)) {
- RpcThrottlingException.throwNumWriteRequestsExceeded(writeReqsLimiter.waitInterval());
+ waitInterval = writeReqsLimiter.getWaitIntervalMs(writeReqs);
+ if (waitInterval > 0) {
+ RpcThrottlingException.throwNumWriteRequestsExceeded(waitInterval);
}
- if (!writeSizeLimiter.canExecute(estimateWriteSize)) {
- RpcThrottlingException
- .throwWriteSizeExceeded(writeSizeLimiter.waitInterval(estimateWriteSize));
+ waitInterval = writeSizeLimiter.getWaitIntervalMs(estimateWriteSize);
+ if (waitInterval > 0) {
+ RpcThrottlingException.throwWriteSizeExceeded(waitInterval);
}
- if (!writeCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit)) {
- RpcThrottlingException.throwWriteCapacityUnitExceeded(
- writeCapacityUnitLimiter.waitInterval(estimateWriteCapacityUnit));
+ waitInterval = writeCapacityUnitLimiter.getWaitIntervalMs(estimateWriteCapacityUnit);
+ if (waitInterval > 0) {
+ RpcThrottlingException.throwWriteCapacityUnitExceeded(waitInterval);
}
}
if (estimateReadSize > 0) {
- if (!readReqsLimiter.canExecute(readReqs)) {
- RpcThrottlingException.throwNumReadRequestsExceeded(readReqsLimiter.waitInterval());
+ waitInterval = readReqsLimiter.getWaitIntervalMs(readReqs);
+ if (waitInterval > 0) {
+ RpcThrottlingException.throwNumReadRequestsExceeded(waitInterval);
}
- if (!readSizeLimiter.canExecute(estimateReadSize)) {
- RpcThrottlingException
- .throwReadSizeExceeded(readSizeLimiter.waitInterval(estimateReadSize));
+ waitInterval = readSizeLimiter.getWaitIntervalMs(estimateReadSize);
+ if (waitInterval > 0) {
+ RpcThrottlingException.throwReadSizeExceeded(waitInterval);
}
- if (!readCapacityUnitLimiter.canExecute(estimateReadCapacityUnit)) {
- RpcThrottlingException.throwReadCapacityUnitExceeded(
- readCapacityUnitLimiter.waitInterval(estimateReadCapacityUnit));
+ waitInterval = readCapacityUnitLimiter.getWaitIntervalMs(estimateReadCapacityUnit);
+ if (waitInterval > 0) {
+ RpcThrottlingException.throwReadCapacityUnitExceeded(waitInterval);
}
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 0330e523438..fe077e9d078 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -773,7 +773,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
// doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are
// deferred/batched
List<ClientProtos.Action> mutations = null;
- long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
+ long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize());
IOException sizeIOE = null;
ClientProtos.ResultOrException.Builder resultOrExceptionBuilder =
ResultOrException.newBuilder();
@@ -3639,7 +3639,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
}
RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
// now let's do the real scan.
- long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
+ long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize());
RegionScanner scanner = rsh.s;
// this is the limit of rows for this scan, if we the number of rows reach this value, we will
// close the scanner.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java
index d4ea24c7af2..7eb0b09336b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java
@@ -103,9 +103,9 @@ public class TestBlockBytesScannedQuota {
doPuts(10_000, FAMILY, QUALIFIER, table);
TEST_UTIL.flush(TABLE_NAME);
- // Add ~10 block/min limit
+ // Add ~10 block/sec limit
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.READ_SIZE,
- Math.round(10.1 * blockSize), TimeUnit.MINUTES));
+ Math.round(10.1 * blockSize), TimeUnit.SECONDS));
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
// should execute at max 10 requests
@@ -132,10 +132,10 @@ public class TestBlockBytesScannedQuota {
doPuts(10_000, FAMILY, QUALIFIER, table);
TEST_UTIL.flush(TABLE_NAME);
- // Add 1 block/min limit.
+ // Add 1 block/sec limit.
// This should only allow 1 scan per minute, because we estimate 1 block per scan
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, blockSize,
- TimeUnit.MINUTES));
+ TimeUnit.SECONDS));
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
waitMinuteQuota();
@@ -148,9 +148,9 @@ public class TestBlockBytesScannedQuota {
testTraffic(() -> doScans(100, table), 100, 0);
testTraffic(() -> doScans(100, table), 100, 0);
- // Add ~3 block/min limit. This should support >1 scans
+ // Add ~3 block/sec limit. This should support >1 scans
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE,
- Math.round(3.1 * blockSize), TimeUnit.MINUTES));
+ Math.round(3.1 * blockSize), TimeUnit.SECONDS));
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
// should execute some requests, but not all
@@ -174,10 +174,10 @@ public class TestBlockBytesScannedQuota {
doPuts(rowCount, FAMILY, QUALIFIER, table);
TEST_UTIL.flush(TABLE_NAME);
- // Add 1 block/min limit.
+ // Add 1 block/sec limit.
// This should only allow 1 multiget per minute, because we estimate 1 block per multiget
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, blockSize,
- TimeUnit.MINUTES));
+ TimeUnit.SECONDS));
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
waitMinuteQuota();
@@ -190,9 +190,9 @@ public class TestBlockBytesScannedQuota {
testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0);
testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0);
- // Add ~100 block/min limit
+ // Add ~100 block/sec limit
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE,
- Math.round(100.1 * blockSize), TimeUnit.MINUTES));
+ Math.round(100.1 * blockSize), TimeUnit.SECONDS));
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
// should execute approximately 10 batches of 10 requests
@@ -211,7 +211,7 @@ public class TestBlockBytesScannedQuota {
private void testTraffic(Callable<Long> trafficCallable, long expectedSuccess, long marginOfError)
throws Exception {
- TEST_UTIL.waitFor(90_000, () -> {
+ TEST_UTIL.waitFor(5_000, () -> {
long actualSuccess;
try {
actualSuccess = trafficCallable.call();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java
index 49df937f7c5..ae9b96d7a6c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java
@@ -18,8 +18,7 @@
package org.apache.hadoop.hbase.quotas;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotEquals;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -71,7 +70,7 @@ public class TestRateLimiter {
// consume all the available resources, one request at the time.
// the wait interval should be 0
for (int i = 0; i < (limit - 1); ++i) {
- assertTrue(limiter.canExecute());
+ assertEquals(0, limiter.getWaitIntervalMs());
limiter.consume();
long waitInterval = limiter.waitInterval();
assertEquals(0, waitInterval);
@@ -81,7 +80,7 @@ public class TestRateLimiter {
// There is one resource available, so we should be able to
// consume it without waiting.
limiter.setNextRefillTime(limiter.getNextRefillTime() - nowTs);
- assertTrue(limiter.canExecute());
+ assertEquals(0, limiter.getWaitIntervalMs());
assertEquals(0, limiter.waitInterval());
limiter.consume();
// No more resources are available, we should wait for at least an interval.
@@ -94,7 +93,7 @@ public class TestRateLimiter {
// artificially go into the past to prove that when too early we should fail.
long temp = nowTs + 500;
limiter.setNextRefillTime(limiter.getNextRefillTime() + temp);
- assertFalse(limiter.canExecute());
+ assertNotEquals(0, limiter.getWaitIntervalMs());
// Roll back the nextRefillTime set to continue further testing
limiter.setNextRefillTime(limiter.getNextRefillTime() - temp);
}
@@ -107,7 +106,7 @@ public class TestRateLimiter {
// 10 resources are available, but we need to consume 20 resources
// Verify that we have to wait at least 1.1sec to have 1 resource available
- assertTrue(limiter.canExecute());
+ assertEquals(0, limiter.getWaitIntervalMs());
limiter.consume(20);
// We consumed twice the quota. Need to wait 1s to get back to 0, then another 100ms for the 1
assertEquals(1100, limiter.waitInterval(1));
@@ -116,10 +115,10 @@ public class TestRateLimiter {
// Verify that after 1sec we need to wait for another 0.1sec to get a resource available
limiter.setNextRefillTime(limiter.getNextRefillTime() - 1000);
- assertFalse(limiter.canExecute(1));
+ assertNotEquals(0, limiter.getWaitIntervalMs(1));
limiter.setNextRefillTime(limiter.getNextRefillTime() - 100);
// We've waited the full 1.1sec, should now have 1 available
- assertTrue(limiter.canExecute(1));
+ assertEquals(0, limiter.getWaitIntervalMs(1));
assertEquals(0, limiter.waitInterval());
}
@@ -138,7 +137,7 @@ public class TestRateLimiter {
}
};
EnvironmentEdgeManager.injectEdge(edge);
- assertTrue(limiter.canExecute());
+ assertEquals(0, limiter.getWaitIntervalMs());
// 10 resources are available, but we need to consume 20 resources
limiter.consume(20);
// We over-consumed by 10. Since this is a fixed interval refill, where
@@ -149,10 +148,10 @@ public class TestRateLimiter {
// Verify that after 1sec also no resource should be available
limiter.setNextRefillTime(limiter.getNextRefillTime() - 1000);
- assertFalse(limiter.canExecute());
+ assertNotEquals(0, limiter.getWaitIntervalMs());
// Verify that after total 2sec the 10 resource is available
limiter.setNextRefillTime(limiter.getNextRefillTime() - 1000);
- assertTrue(limiter.canExecute());
+ assertEquals(0, limiter.getWaitIntervalMs());
assertEquals(0, limiter.waitInterval());
}
@@ -161,12 +160,12 @@ public class TestRateLimiter {
RateLimiter limiter = new FixedIntervalRateLimiter();
limiter.set(10, TimeUnit.SECONDS);
- assertTrue(limiter.canExecute(10));
+ assertEquals(0, limiter.getWaitIntervalMs(10));
limiter.consume(3);
assertEquals(7, limiter.getAvailable());
- assertFalse(limiter.canExecute(10));
+ assertNotEquals(0, limiter.getWaitIntervalMs(10));
limiter.setNextRefillTime(limiter.getNextRefillTime() - 1000);
- assertTrue(limiter.canExecute(10));
+ assertEquals(0, limiter.getWaitIntervalMs(10));
assertEquals(10, limiter.getAvailable());
}
@@ -182,7 +181,7 @@ public class TestRateLimiter {
limiter.setNextRefillTime(limiter.getNextRefillTime() - 500);
for (int i = 0; i < 3; i++) {
// 6 resources/sec < limit, so limiter.canExecute(nowTs, lastTs) should be true
- assertEquals(true, limiter.canExecute());
+ assertEquals(limiter.getWaitIntervalMs(), 0);
limiter.consume();
}
}
@@ -237,7 +236,7 @@ public class TestRateLimiter {
int count = 0;
while ((request++) < rate) {
limiter.setNextRefillTime(limiter.getNextRefillTime() - limiter.getTimeUnitInMillis() / rate);
- if (limiter.canExecute()) {
+ if (limiter.getWaitIntervalMs() == 0) {
count++;
limiter.consume();
}
@@ -317,28 +316,28 @@ public class TestRateLimiter {
assertEquals(limit, avgLimiter.getAvailable());
assertEquals(limit, fixLimiter.getAvailable());
- assertTrue(avgLimiter.canExecute(limit));
+ assertEquals(0, avgLimiter.getWaitIntervalMs(limit));
avgLimiter.consume(limit);
- assertTrue(fixLimiter.canExecute(limit));
+ assertEquals(0, fixLimiter.getWaitIntervalMs(limit));
fixLimiter.consume(limit);
// Make sure that available is Long.MAX_VALUE
- assertTrue(limit == avgLimiter.getAvailable());
- assertTrue(limit == fixLimiter.getAvailable());
+ assertEquals(limit, avgLimiter.getAvailable());
+ assertEquals(limit, fixLimiter.getAvailable());
// after 100 millseconds, it should be able to execute limit as well
testEdge.incValue(100);
- assertTrue(avgLimiter.canExecute(limit));
+ assertEquals(0, avgLimiter.getWaitIntervalMs(limit));
avgLimiter.consume(limit);
- assertTrue(fixLimiter.canExecute(limit));
+ assertEquals(0, fixLimiter.getWaitIntervalMs(limit));
fixLimiter.consume(limit);
// Make sure that available is Long.MAX_VALUE
- assertTrue(limit == avgLimiter.getAvailable());
- assertTrue(limit == fixLimiter.getAvailable());
+ assertEquals(limit, avgLimiter.getAvailable());
+ assertEquals(limit, fixLimiter.getAvailable());
EnvironmentEdgeManager.reset();
}
@@ -358,39 +357,39 @@ public class TestRateLimiter {
assertEquals(limit, avgLimiter.getAvailable());
assertEquals(limit, fixLimiter.getAvailable());
- assertTrue(avgLimiter.canExecute(limit / 2));
+ assertEquals(0, avgLimiter.getWaitIntervalMs(limit / 2));
avgLimiter.consume(limit / 2);
- assertTrue(fixLimiter.canExecute(limit / 2));
+ assertEquals(0, fixLimiter.getWaitIntervalMs(limit / 2));
fixLimiter.consume(limit / 2);
// Make sure that available is whatever left
- assertTrue((limit - (limit / 2)) == avgLimiter.getAvailable());
- assertTrue((limit - (limit / 2)) == fixLimiter.getAvailable());
+ assertEquals((limit - (limit / 2)), avgLimiter.getAvailable());
+ assertEquals((limit - (limit / 2)), fixLimiter.getAvailable());
// after 100 millseconds, both should not be able to execute the limit
testEdge.incValue(100);
- assertFalse(avgLimiter.canExecute(limit));
- assertFalse(fixLimiter.canExecute(limit));
+ assertNotEquals(0, avgLimiter.getWaitIntervalMs(limit));
+ assertNotEquals(0, fixLimiter.getWaitIntervalMs(limit));
// after 500 millseconds, average interval limiter should be able to execute the limit
testEdge.incValue(500);
- assertTrue(avgLimiter.canExecute(limit));
- assertFalse(fixLimiter.canExecute(limit));
+ assertEquals(0, avgLimiter.getWaitIntervalMs(limit));
+ assertNotEquals(0, fixLimiter.getWaitIntervalMs(limit));
// Make sure that available is correct
- assertTrue(limit == avgLimiter.getAvailable());
- assertTrue((limit - (limit / 2)) == fixLimiter.getAvailable());
+ assertEquals(limit, avgLimiter.getAvailable());
+ assertEquals((limit - (limit / 2)), fixLimiter.getAvailable());
// after 500 millseconds, both should be able to execute
testEdge.incValue(500);
- assertTrue(avgLimiter.canExecute(limit));
- assertTrue(fixLimiter.canExecute(limit));
+ assertEquals(0, avgLimiter.getWaitIntervalMs(limit));
+ assertEquals(0, fixLimiter.getWaitIntervalMs(limit));
// Make sure that available is Long.MAX_VALUE
- assertTrue(limit == avgLimiter.getAvailable());
- assertTrue(limit == fixLimiter.getAvailable());
+ assertEquals(limit, avgLimiter.getAvailable());
+ assertEquals(limit, fixLimiter.getAvailable());
EnvironmentEdgeManager.reset();
}
@@ -413,19 +412,19 @@ public class TestRateLimiter {
assertEquals(limit, avgLimiter.getAvailable());
// The initial guess is that 100 bytes.
- assertTrue(avgLimiter.canExecute(guessNumber));
+ assertEquals(0, avgLimiter.getWaitIntervalMs(guessNumber));
avgLimiter.consume(guessNumber);
// Make sure that available is whatever left
- assertTrue((limit - guessNumber) == avgLimiter.getAvailable());
+ assertEquals((limit - guessNumber), avgLimiter.getAvailable());
// Manually set avil to simulate that another thread call canExecute().
// It is simulated by consume().
avgLimiter.consume(-80);
- assertTrue((limit - guessNumber + 80) == avgLimiter.getAvailable());
+ assertEquals((limit - guessNumber + 80), avgLimiter.getAvailable());
// Now thread1 compensates 80
avgLimiter.consume(-80);
- assertTrue(limit == avgLimiter.getAvailable());
+ assertEquals(limit, avgLimiter.getAvailable());
}
}