You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bb...@apache.org on 2024/03/07 02:30:09 UTC

(hbase) branch branch-2 updated: HBASE-28359 Improve quota RateLimiter synchronization (#5683)

This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new c7eb3d848e3 HBASE-28359 Improve quota RateLimiter synchronization (#5683)
c7eb3d848e3 is described below

commit c7eb3d848e3907aa12ca65bba3b1f83554d0facd
Author: Ray Mattingly <rm...@gmail.com>
AuthorDate: Wed Mar 6 20:06:29 2024 -0500

    HBASE-28359 Improve quota RateLimiter synchronization (#5683)
    
    Signed-off-by: Bryan Beaudreault <bb...@apache.org>
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../hadoop/hbase/quotas/DefaultOperationQuota.java |  5 ++
 .../hadoop/hbase/quotas/NoopOperationQuota.java    |  5 ++
 .../apache/hadoop/hbase/quotas/OperationQuota.java | 11 +++
 .../apache/hadoop/hbase/quotas/RateLimiter.java    | 31 +++++---
 .../hadoop/hbase/quotas/TimeBasedLimiter.java      | 52 +++++++-------
 .../hadoop/hbase/regionserver/RSRpcServices.java   |  4 +-
 .../hbase/quotas/TestBlockBytesScannedQuota.java   | 22 +++---
 .../hadoop/hbase/quotas/TestRateLimiter.java       | 83 +++++++++++-----------
 8 files changed, 124 insertions(+), 89 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
index 4b89e18a802..a4ff8b2a859 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
@@ -134,6 +134,11 @@ public class DefaultOperationQuota implements OperationQuota {
     return readAvailable;
   }
 
+  @Override
+  public long getReadConsumed() {
+    return readConsumed;
+  }
+
   @Override
   public void addGetResult(final Result result) {
     operationSize[OperationType.GET.ordinal()] += QuotaUtil.calculateResultSize(result);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
index 71fc169d671..b64429d9adc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
@@ -68,4 +68,9 @@ class NoopOperationQuota implements OperationQuota {
   public long getReadAvailable() {
     return Long.MAX_VALUE;
   }
+
+  @Override
+  public long getReadConsumed() {
+    return 0L;
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
index ffc3cd50825..bedad5e9867 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
@@ -80,4 +80,15 @@ public interface OperationQuota {
 
   /** Returns the number of bytes available to read to avoid exceeding the quota */
   long getReadAvailable();
+
+  /** Returns the number of bytes consumed from the quota by the operation */
+  long getReadConsumed();
+
+  /**
+   * Returns the maximum result size to be returned by the given operation. This is the greater of
+   * two numbers: the bytes available, or the bytes already consumed
+   */
+  default long getMaxResultSize() {
+    return Math.max(getReadAvailable(), getReadConsumed());
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
index bda60ffa690..5c69ad5d6cd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
@@ -23,12 +23,10 @@ import org.apache.yetus.audience.InterfaceStability;
 
 /**
  * Simple rate limiter. Usage Example: // At this point you have a unlimited resource limiter
- * RateLimiter limiter = new AverageIntervalRateLimiter(); or new FixedIntervalRateLimiter();
- * limiter.set(10, TimeUnit.SECONDS); // set 10 resources/sec while (true) { // call canExecute
- * before performing resource consuming operation bool canExecute = limiter.canExecute(); // If
- * there are no available resources, wait until one is available if (!canExecute)
- * Thread.sleep(limiter.waitInterval()); // ...execute the work and consume the resource...
- * limiter.consume(); }
+ * RateLimiter limiter = new AverageIntervalRateLimiter(); // or new FixedIntervalRateLimiter();
+ * limiter.set(10, TimeUnit.SECONDS); // set 10 resources/sec while (limiter.getWaitIntervalMs > 0)
+ * { // wait until waitInterval == 0 Thread.sleep(limiter.getWaitIntervalMs()); } // ...execute the
+ * work and consume the resource... limiter.consume();
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
@@ -135,10 +133,23 @@ public abstract class RateLimiter {
 
   /**
    * Is there at least one resource available to allow execution?
-   * @return true if there is at least one resource available, otherwise false
+   * @return the waitInterval to backoff, or 0 if execution is allowed
    */
-  public boolean canExecute() {
-    return canExecute(1);
+  public long getWaitIntervalMs() {
+    return getWaitIntervalMs(1);
+  }
+
+  /**
+   * Are there enough available resources to allow execution?
+   * @param amount the number of required resources, a non-negative number
+   * @return the waitInterval to backoff, or 0 if execution is allowed
+   */
+  public synchronized long getWaitIntervalMs(final long amount) {
+    assert amount >= 0;
+    if (!isAvailable(amount)) {
+      return waitInterval(amount);
+    }
+    return 0;
   }
 
   /**
@@ -146,7 +157,7 @@ public abstract class RateLimiter {
    * @param amount the number of required resources, a non-negative number
    * @return true if there are enough available resources, otherwise false
    */
-  public synchronized boolean canExecute(final long amount) {
+  private boolean isAvailable(final long amount) {
     if (isBypass()) {
       return true;
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
index d7eb0e537a3..8ae2cae0188 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
@@ -141,43 +141,47 @@ public class TimeBasedLimiter implements QuotaLimiter {
   public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs,
     long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit)
     throws RpcThrottlingException {
-    if (!reqsLimiter.canExecute(writeReqs + readReqs)) {
-      RpcThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval());
+    long waitInterval = reqsLimiter.getWaitIntervalMs(writeReqs + readReqs);
+    if (waitInterval > 0) {
+      RpcThrottlingException.throwNumRequestsExceeded(waitInterval);
     }
-    if (!reqSizeLimiter.canExecute(estimateWriteSize + estimateReadSize)) {
-      RpcThrottlingException.throwRequestSizeExceeded(
-        reqSizeLimiter.waitInterval(estimateWriteSize + estimateReadSize));
+    waitInterval = reqSizeLimiter.getWaitIntervalMs(estimateWriteSize + estimateReadSize);
+    if (waitInterval > 0) {
+      RpcThrottlingException.throwRequestSizeExceeded(waitInterval);
     }
-    if (!reqCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit + estimateReadCapacityUnit)) {
-      RpcThrottlingException.throwRequestCapacityUnitExceeded(
-        reqCapacityUnitLimiter.waitInterval(estimateWriteCapacityUnit + estimateReadCapacityUnit));
+    waitInterval = reqCapacityUnitLimiter
+      .getWaitIntervalMs(estimateWriteCapacityUnit + estimateReadCapacityUnit);
+    if (waitInterval > 0) {
+      RpcThrottlingException.throwRequestCapacityUnitExceeded(waitInterval);
     }
 
     if (estimateWriteSize > 0) {
-      if (!writeReqsLimiter.canExecute(writeReqs)) {
-        RpcThrottlingException.throwNumWriteRequestsExceeded(writeReqsLimiter.waitInterval());
+      waitInterval = writeReqsLimiter.getWaitIntervalMs(writeReqs);
+      if (waitInterval > 0) {
+        RpcThrottlingException.throwNumWriteRequestsExceeded(waitInterval);
       }
-      if (!writeSizeLimiter.canExecute(estimateWriteSize)) {
-        RpcThrottlingException
-          .throwWriteSizeExceeded(writeSizeLimiter.waitInterval(estimateWriteSize));
+      waitInterval = writeSizeLimiter.getWaitIntervalMs(estimateWriteSize);
+      if (waitInterval > 0) {
+        RpcThrottlingException.throwWriteSizeExceeded(waitInterval);
       }
-      if (!writeCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit)) {
-        RpcThrottlingException.throwWriteCapacityUnitExceeded(
-          writeCapacityUnitLimiter.waitInterval(estimateWriteCapacityUnit));
+      waitInterval = writeCapacityUnitLimiter.getWaitIntervalMs(estimateWriteCapacityUnit);
+      if (waitInterval > 0) {
+        RpcThrottlingException.throwWriteCapacityUnitExceeded(waitInterval);
       }
     }
 
     if (estimateReadSize > 0) {
-      if (!readReqsLimiter.canExecute(readReqs)) {
-        RpcThrottlingException.throwNumReadRequestsExceeded(readReqsLimiter.waitInterval());
+      waitInterval = readReqsLimiter.getWaitIntervalMs(readReqs);
+      if (waitInterval > 0) {
+        RpcThrottlingException.throwNumReadRequestsExceeded(waitInterval);
       }
-      if (!readSizeLimiter.canExecute(estimateReadSize)) {
-        RpcThrottlingException
-          .throwReadSizeExceeded(readSizeLimiter.waitInterval(estimateReadSize));
+      waitInterval = readSizeLimiter.getWaitIntervalMs(estimateReadSize);
+      if (waitInterval > 0) {
+        RpcThrottlingException.throwReadSizeExceeded(waitInterval);
       }
-      if (!readCapacityUnitLimiter.canExecute(estimateReadCapacityUnit)) {
-        RpcThrottlingException.throwReadCapacityUnitExceeded(
-          readCapacityUnitLimiter.waitInterval(estimateReadCapacityUnit));
+      waitInterval = readCapacityUnitLimiter.getWaitIntervalMs(estimateReadCapacityUnit);
+      if (waitInterval > 0) {
+        RpcThrottlingException.throwReadCapacityUnitExceeded(waitInterval);
       }
     }
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 0330e523438..fe077e9d078 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -773,7 +773,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
     // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are
     // deferred/batched
     List<ClientProtos.Action> mutations = null;
-    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
+    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize());
     IOException sizeIOE = null;
     ClientProtos.ResultOrException.Builder resultOrExceptionBuilder =
       ResultOrException.newBuilder();
@@ -3639,7 +3639,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
     }
     RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
     // now let's do the real scan.
-    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
+    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize());
     RegionScanner scanner = rsh.s;
     // this is the limit of rows for this scan, if we the number of rows reach this value, we will
     // close the scanner.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java
index d4ea24c7af2..7eb0b09336b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java
@@ -103,9 +103,9 @@ public class TestBlockBytesScannedQuota {
     doPuts(10_000, FAMILY, QUALIFIER, table);
     TEST_UTIL.flush(TABLE_NAME);
 
-    // Add ~10 block/min limit
+    // Add ~10 block/sec limit
     admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.READ_SIZE,
-      Math.round(10.1 * blockSize), TimeUnit.MINUTES));
+      Math.round(10.1 * blockSize), TimeUnit.SECONDS));
     triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
 
     // should execute at max 10 requests
@@ -132,10 +132,10 @@ public class TestBlockBytesScannedQuota {
     doPuts(10_000, FAMILY, QUALIFIER, table);
     TEST_UTIL.flush(TABLE_NAME);
 
-    // Add 1 block/min limit.
+    // Add 1 block/sec limit.
     // This should only allow 1 scan per minute, because we estimate 1 block per scan
     admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, blockSize,
-      TimeUnit.MINUTES));
+      TimeUnit.SECONDS));
     triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
     waitMinuteQuota();
 
@@ -148,9 +148,9 @@ public class TestBlockBytesScannedQuota {
     testTraffic(() -> doScans(100, table), 100, 0);
     testTraffic(() -> doScans(100, table), 100, 0);
 
-    // Add ~3 block/min limit. This should support >1 scans
+    // Add ~3 block/sec limit. This should support >1 scans
     admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE,
-      Math.round(3.1 * blockSize), TimeUnit.MINUTES));
+      Math.round(3.1 * blockSize), TimeUnit.SECONDS));
     triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
 
     // should execute some requests, but not all
@@ -174,10 +174,10 @@ public class TestBlockBytesScannedQuota {
     doPuts(rowCount, FAMILY, QUALIFIER, table);
     TEST_UTIL.flush(TABLE_NAME);
 
-    // Add 1 block/min limit.
+    // Add 1 block/sec limit.
     // This should only allow 1 multiget per minute, because we estimate 1 block per multiget
     admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, blockSize,
-      TimeUnit.MINUTES));
+      TimeUnit.SECONDS));
     triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
     waitMinuteQuota();
 
@@ -190,9 +190,9 @@ public class TestBlockBytesScannedQuota {
     testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0);
     testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, table), 100, 0);
 
-    // Add ~100 block/min limit
+    // Add ~100 block/sec limit
     admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE,
-      Math.round(100.1 * blockSize), TimeUnit.MINUTES));
+      Math.round(100.1 * blockSize), TimeUnit.SECONDS));
     triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
 
     // should execute approximately 10 batches of 10 requests
@@ -211,7 +211,7 @@ public class TestBlockBytesScannedQuota {
 
   private void testTraffic(Callable<Long> trafficCallable, long expectedSuccess, long marginOfError)
     throws Exception {
-    TEST_UTIL.waitFor(90_000, () -> {
+    TEST_UTIL.waitFor(5_000, () -> {
       long actualSuccess;
       try {
         actualSuccess = trafficCallable.call();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java
index 49df937f7c5..ae9b96d7a6c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java
@@ -18,8 +18,7 @@
 package org.apache.hadoop.hbase.quotas;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotEquals;
 
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -71,7 +70,7 @@ public class TestRateLimiter {
     // consume all the available resources, one request at the time.
     // the wait interval should be 0
     for (int i = 0; i < (limit - 1); ++i) {
-      assertTrue(limiter.canExecute());
+      assertEquals(0, limiter.getWaitIntervalMs());
       limiter.consume();
       long waitInterval = limiter.waitInterval();
       assertEquals(0, waitInterval);
@@ -81,7 +80,7 @@ public class TestRateLimiter {
       // There is one resource available, so we should be able to
       // consume it without waiting.
       limiter.setNextRefillTime(limiter.getNextRefillTime() - nowTs);
-      assertTrue(limiter.canExecute());
+      assertEquals(0, limiter.getWaitIntervalMs());
       assertEquals(0, limiter.waitInterval());
       limiter.consume();
       // No more resources are available, we should wait for at least an interval.
@@ -94,7 +93,7 @@ public class TestRateLimiter {
       // artificially go into the past to prove that when too early we should fail.
       long temp = nowTs + 500;
       limiter.setNextRefillTime(limiter.getNextRefillTime() + temp);
-      assertFalse(limiter.canExecute());
+      assertNotEquals(0, limiter.getWaitIntervalMs());
       // Roll back the nextRefillTime set to continue further testing
       limiter.setNextRefillTime(limiter.getNextRefillTime() - temp);
     }
@@ -107,7 +106,7 @@ public class TestRateLimiter {
 
     // 10 resources are available, but we need to consume 20 resources
     // Verify that we have to wait at least 1.1sec to have 1 resource available
-    assertTrue(limiter.canExecute());
+    assertEquals(0, limiter.getWaitIntervalMs());
     limiter.consume(20);
     // We consumed twice the quota. Need to wait 1s to get back to 0, then another 100ms for the 1
     assertEquals(1100, limiter.waitInterval(1));
@@ -116,10 +115,10 @@ public class TestRateLimiter {
 
     // Verify that after 1sec we need to wait for another 0.1sec to get a resource available
     limiter.setNextRefillTime(limiter.getNextRefillTime() - 1000);
-    assertFalse(limiter.canExecute(1));
+    assertNotEquals(0, limiter.getWaitIntervalMs(1));
     limiter.setNextRefillTime(limiter.getNextRefillTime() - 100);
     // We've waited the full 1.1sec, should now have 1 available
-    assertTrue(limiter.canExecute(1));
+    assertEquals(0, limiter.getWaitIntervalMs(1));
     assertEquals(0, limiter.waitInterval());
   }
 
@@ -138,7 +137,7 @@ public class TestRateLimiter {
       }
     };
     EnvironmentEdgeManager.injectEdge(edge);
-    assertTrue(limiter.canExecute());
+    assertEquals(0, limiter.getWaitIntervalMs());
     // 10 resources are available, but we need to consume 20 resources
     limiter.consume(20);
     // We over-consumed by 10. Since this is a fixed interval refill, where
@@ -149,10 +148,10 @@ public class TestRateLimiter {
 
     // Verify that after 1sec also no resource should be available
     limiter.setNextRefillTime(limiter.getNextRefillTime() - 1000);
-    assertFalse(limiter.canExecute());
+    assertNotEquals(0, limiter.getWaitIntervalMs());
     // Verify that after total 2sec the 10 resource is available
     limiter.setNextRefillTime(limiter.getNextRefillTime() - 1000);
-    assertTrue(limiter.canExecute());
+    assertEquals(0, limiter.getWaitIntervalMs());
     assertEquals(0, limiter.waitInterval());
   }
 
@@ -161,12 +160,12 @@ public class TestRateLimiter {
     RateLimiter limiter = new FixedIntervalRateLimiter();
     limiter.set(10, TimeUnit.SECONDS);
 
-    assertTrue(limiter.canExecute(10));
+    assertEquals(0, limiter.getWaitIntervalMs(10));
     limiter.consume(3);
     assertEquals(7, limiter.getAvailable());
-    assertFalse(limiter.canExecute(10));
+    assertNotEquals(0, limiter.getWaitIntervalMs(10));
     limiter.setNextRefillTime(limiter.getNextRefillTime() - 1000);
-    assertTrue(limiter.canExecute(10));
+    assertEquals(0, limiter.getWaitIntervalMs(10));
     assertEquals(10, limiter.getAvailable());
   }
 
@@ -182,7 +181,7 @@ public class TestRateLimiter {
       limiter.setNextRefillTime(limiter.getNextRefillTime() - 500);
       for (int i = 0; i < 3; i++) {
         // 6 resources/sec < limit, so limiter.canExecute(nowTs, lastTs) should be true
-        assertEquals(true, limiter.canExecute());
+        assertEquals(limiter.getWaitIntervalMs(), 0);
         limiter.consume();
       }
     }
@@ -237,7 +236,7 @@ public class TestRateLimiter {
     int count = 0;
     while ((request++) < rate) {
       limiter.setNextRefillTime(limiter.getNextRefillTime() - limiter.getTimeUnitInMillis() / rate);
-      if (limiter.canExecute()) {
+      if (limiter.getWaitIntervalMs() == 0) {
         count++;
         limiter.consume();
       }
@@ -317,28 +316,28 @@ public class TestRateLimiter {
     assertEquals(limit, avgLimiter.getAvailable());
     assertEquals(limit, fixLimiter.getAvailable());
 
-    assertTrue(avgLimiter.canExecute(limit));
+    assertEquals(0, avgLimiter.getWaitIntervalMs(limit));
     avgLimiter.consume(limit);
 
-    assertTrue(fixLimiter.canExecute(limit));
+    assertEquals(0, fixLimiter.getWaitIntervalMs(limit));
     fixLimiter.consume(limit);
 
     // Make sure that available is Long.MAX_VALUE
-    assertTrue(limit == avgLimiter.getAvailable());
-    assertTrue(limit == fixLimiter.getAvailable());
+    assertEquals(limit, avgLimiter.getAvailable());
+    assertEquals(limit, fixLimiter.getAvailable());
 
     // after 100 millseconds, it should be able to execute limit as well
     testEdge.incValue(100);
 
-    assertTrue(avgLimiter.canExecute(limit));
+    assertEquals(0, avgLimiter.getWaitIntervalMs(limit));
     avgLimiter.consume(limit);
 
-    assertTrue(fixLimiter.canExecute(limit));
+    assertEquals(0, fixLimiter.getWaitIntervalMs(limit));
     fixLimiter.consume(limit);
 
     // Make sure that available is Long.MAX_VALUE
-    assertTrue(limit == avgLimiter.getAvailable());
-    assertTrue(limit == fixLimiter.getAvailable());
+    assertEquals(limit, avgLimiter.getAvailable());
+    assertEquals(limit, fixLimiter.getAvailable());
 
     EnvironmentEdgeManager.reset();
   }
@@ -358,39 +357,39 @@ public class TestRateLimiter {
     assertEquals(limit, avgLimiter.getAvailable());
     assertEquals(limit, fixLimiter.getAvailable());
 
-    assertTrue(avgLimiter.canExecute(limit / 2));
+    assertEquals(0, avgLimiter.getWaitIntervalMs(limit / 2));
     avgLimiter.consume(limit / 2);
 
-    assertTrue(fixLimiter.canExecute(limit / 2));
+    assertEquals(0, fixLimiter.getWaitIntervalMs(limit / 2));
     fixLimiter.consume(limit / 2);
 
     // Make sure that available is whatever left
-    assertTrue((limit - (limit / 2)) == avgLimiter.getAvailable());
-    assertTrue((limit - (limit / 2)) == fixLimiter.getAvailable());
+    assertEquals((limit - (limit / 2)), avgLimiter.getAvailable());
+    assertEquals((limit - (limit / 2)), fixLimiter.getAvailable());
 
     // after 100 millseconds, both should not be able to execute the limit
     testEdge.incValue(100);
 
-    assertFalse(avgLimiter.canExecute(limit));
-    assertFalse(fixLimiter.canExecute(limit));
+    assertNotEquals(0, avgLimiter.getWaitIntervalMs(limit));
+    assertNotEquals(0, fixLimiter.getWaitIntervalMs(limit));
 
     // after 500 millseconds, average interval limiter should be able to execute the limit
     testEdge.incValue(500);
-    assertTrue(avgLimiter.canExecute(limit));
-    assertFalse(fixLimiter.canExecute(limit));
+    assertEquals(0, avgLimiter.getWaitIntervalMs(limit));
+    assertNotEquals(0, fixLimiter.getWaitIntervalMs(limit));
 
     // Make sure that available is correct
-    assertTrue(limit == avgLimiter.getAvailable());
-    assertTrue((limit - (limit / 2)) == fixLimiter.getAvailable());
+    assertEquals(limit, avgLimiter.getAvailable());
+    assertEquals((limit - (limit / 2)), fixLimiter.getAvailable());
 
     // after 500 millseconds, both should be able to execute
     testEdge.incValue(500);
-    assertTrue(avgLimiter.canExecute(limit));
-    assertTrue(fixLimiter.canExecute(limit));
+    assertEquals(0, avgLimiter.getWaitIntervalMs(limit));
+    assertEquals(0, fixLimiter.getWaitIntervalMs(limit));
 
     // Make sure that available is Long.MAX_VALUE
-    assertTrue(limit == avgLimiter.getAvailable());
-    assertTrue(limit == fixLimiter.getAvailable());
+    assertEquals(limit, avgLimiter.getAvailable());
+    assertEquals(limit, fixLimiter.getAvailable());
 
     EnvironmentEdgeManager.reset();
   }
@@ -413,19 +412,19 @@ public class TestRateLimiter {
     assertEquals(limit, avgLimiter.getAvailable());
 
     // The initial guess is that 100 bytes.
-    assertTrue(avgLimiter.canExecute(guessNumber));
+    assertEquals(0, avgLimiter.getWaitIntervalMs(guessNumber));
     avgLimiter.consume(guessNumber);
 
     // Make sure that available is whatever left
-    assertTrue((limit - guessNumber) == avgLimiter.getAvailable());
+    assertEquals((limit - guessNumber), avgLimiter.getAvailable());
 
     // Manually set avil to simulate that another thread call canExecute().
     // It is simulated by consume().
     avgLimiter.consume(-80);
-    assertTrue((limit - guessNumber + 80) == avgLimiter.getAvailable());
+    assertEquals((limit - guessNumber + 80), avgLimiter.getAvailable());
 
     // Now thread1 compensates 80
     avgLimiter.consume(-80);
-    assertTrue(limit == avgLimiter.getAvailable());
+    assertEquals(limit, avgLimiter.getAvailable());
   }
 }