You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2016/10/10 21:33:56 UTC

[1/5] hbase git commit: HBASE-16699 Overflows in AverageIntervalRateLimiter's refill() and getWaitInterval()

Repository: hbase
Updated Branches:
  refs/heads/branch-1 acb1392b1 -> 66038b8c1
  refs/heads/branch-1.1 9620dc4e7 -> 47c8659d2
  refs/heads/branch-1.2 bd38f8dbf -> 55e989090
  refs/heads/branch-1.3 6a1598674 -> 387a08657
  refs/heads/master c930bc92f -> 341f049e7


HBASE-16699 Overflows in AverageIntervalRateLimiter's refill() and getWaitInterval()

Signed-off-by: Matteo Bertozzi <ma...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/341f049e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/341f049e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/341f049e

Branch: refs/heads/master
Commit: 341f049e7734be0677ce38018dcb125b991469e7
Parents: c930bc9
Author: Huaxiang Sun <hs...@cloudera.com>
Authored: Mon Oct 10 14:06:28 2016 -0700
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Mon Oct 10 14:07:00 2016 -0700

----------------------------------------------------------------------
 .../quotas/AverageIntervalRateLimiter.java      |  20 ++-
 .../hbase/quotas/DefaultOperationQuota.java     |  46 ++-----
 .../hadoop/hbase/quotas/NoopOperationQuota.java |   5 -
 .../hadoop/hbase/quotas/NoopQuotaLimiter.java   |   9 --
 .../hadoop/hbase/quotas/OperationQuota.java     |  53 --------
 .../hadoop/hbase/quotas/QuotaLimiter.java       |   9 --
 .../apache/hadoop/hbase/quotas/RateLimiter.java |  38 +++++-
 .../hadoop/hbase/quotas/TimeBasedLimiter.java   |  13 --
 .../hadoop/hbase/quotas/TestRateLimiter.java    | 130 +++++++++++++++++++
 9 files changed, 191 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/341f049e/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java
index 75e6aea..9320d7c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java
@@ -34,12 +34,21 @@ public class AverageIntervalRateLimiter extends RateLimiter {
       return limit;
     }
 
-    long delta = (limit * (now - nextRefillTime)) / super.getTimeUnitInMillis();
+    long timeInterval = now - nextRefillTime;
+    long delta = 0;
+    long timeUnitInMillis = super.getTimeUnitInMillis();
+    if (timeInterval >= timeUnitInMillis) {
+      delta = limit;
+    } else if (timeInterval > 0) {
+      double r = ((double)timeInterval / (double)timeUnitInMillis) * limit;
+      delta = (long)r;
+    }
+
     if (delta > 0) {
       this.nextRefillTime = now;
-      return Math.min(limit, delta);
     }
-    return 0;
+
+    return delta;
   }
 
   @Override
@@ -47,8 +56,9 @@ public class AverageIntervalRateLimiter extends RateLimiter {
     if (nextRefillTime == -1) {
       return 0;
     }
-    long timeUnitInMillis = super.getTimeUnitInMillis();
-    return ((amount * timeUnitInMillis) / limit) - ((available * timeUnitInMillis) / limit);
+
+    double r = ((double)(amount - available)) * super.getTimeUnitInMillis() / limit;
+    return (long)r;
   }
 
   // This method is for strictly testing purpose only

http://git-wip-us.apache.org/repos/asf/hbase/blob/341f049e/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
index 34c749e..9291286 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
@@ -27,8 +27,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.quotas.OperationQuota.AvgOperationSize;
-import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType;
 
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
@@ -40,8 +38,7 @@ public class DefaultOperationQuota implements OperationQuota {
   private long readAvailable = 0;
   private long writeConsumed = 0;
   private long readConsumed = 0;
-
-  private AvgOperationSize avgOpSize = new AvgOperationSize();
+  private final long[] operationSize;
 
   public DefaultOperationQuota(final QuotaLimiter... limiters) {
     this(Arrays.asList(limiters));
@@ -52,6 +49,12 @@ public class DefaultOperationQuota implements OperationQuota {
    */
   public DefaultOperationQuota(final List<QuotaLimiter> limiters) {
     this.limiters = limiters;
+    int size = OperationType.values().length;
+    operationSize = new long[size];
+
+    for (int i = 0; i < size; ++i) {
+      operationSize[i] = 0;
+    }
   }
 
   @Override
@@ -78,20 +81,11 @@ public class DefaultOperationQuota implements OperationQuota {
 
   @Override
   public void close() {
-    // Calculate and set the average size of get, scan and mutate for the current operation
-    long getSize = avgOpSize.getAvgOperationSize(OperationType.GET);
-    long scanSize = avgOpSize.getAvgOperationSize(OperationType.SCAN);
-    long mutationSize = avgOpSize.getAvgOperationSize(OperationType.MUTATE);
-    for (final QuotaLimiter limiter: limiters) {
-      limiter.addOperationSize(OperationType.GET, getSize);
-      limiter.addOperationSize(OperationType.SCAN, scanSize);
-      limiter.addOperationSize(OperationType.MUTATE, mutationSize);
-    }
-
     // Adjust the quota consumed for the specified operation
-    long writeDiff = avgOpSize.getOperationSize(OperationType.MUTATE) - writeConsumed;
-    long readDiff = (avgOpSize.getOperationSize(OperationType.GET) +
-                     avgOpSize.getOperationSize(OperationType.SCAN)) - readConsumed;
+    long writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed;
+    long readDiff = operationSize[OperationType.GET.ordinal()] +
+        operationSize[OperationType.SCAN.ordinal()] - readConsumed;
+
     for (final QuotaLimiter limiter: limiters) {
       if (writeDiff != 0) limiter.consumeWrite(writeDiff);
       if (readDiff != 0) limiter.consumeRead(readDiff);
@@ -110,33 +104,21 @@ public class DefaultOperationQuota implements OperationQuota {
 
   @Override
   public void addGetResult(final Result result) {
-    avgOpSize.addGetResult(result);
+    operationSize[OperationType.GET.ordinal()] += QuotaUtil.calculateResultSize(result);
   }
 
   @Override
   public void addScanResult(final List<Result> results) {
-    avgOpSize.addScanResult(results);
+    operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateResultSize(results);
   }
 
   @Override
   public void addMutation(final Mutation mutation) {
-    avgOpSize.addMutation(mutation);
-  }
-
-  @Override
-  public long getAvgOperationSize(OperationType type) {
-    return avgOpSize.getAvgOperationSize(type);
+    operationSize[OperationType.MUTATE.ordinal()] += QuotaUtil.calculateMutationSize(mutation);
   }
 
   private long estimateConsume(final OperationType type, int numReqs, long avgSize) {
     if (numReqs > 0) {
-      for (final QuotaLimiter limiter: limiters) {
-        long size = limiter.getAvgOperationSize(type);
-        if (size > 0) {
-          avgSize = size;
-          break;
-        }
-      }
       return avgSize * numReqs;
     }
     return 0;

http://git-wip-us.apache.org/repos/asf/hbase/blob/341f049e/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
index e67c7c0..71efc02 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
@@ -76,9 +76,4 @@ class NoopOperationQuota implements OperationQuota {
   public long getWriteAvailable() {
     return Long.MAX_VALUE;
   }
-
-  @Override
-  public long getAvgOperationSize(OperationType type) {
-    return -1;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/341f049e/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
index 2273dc0..ee3ff21 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
@@ -71,15 +71,6 @@ class NoopQuotaLimiter implements QuotaLimiter {
   }
 
   @Override
-  public void addOperationSize(OperationType type, long size) {
-  }
-
-  @Override
-  public long getAvgOperationSize(OperationType type) {
-    return -1;
-  }
-
-  @Override
   public String toString() {
     return "NoopQuotaLimiter";
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/341f049e/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
index b885ac9..16f39ed 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
@@ -34,56 +34,6 @@ public interface OperationQuota {
   public enum OperationType { MUTATE, GET, SCAN }
 
   /**
-   * Keeps track of the average data size of operations like get, scan, mutate
-   */
-  public class AvgOperationSize {
-    private final long[] sizeSum;
-    private final long[] count;
-
-    public AvgOperationSize() {
-      int size = OperationType.values().length;
-      sizeSum = new long[size];
-      count = new long[size];
-      for (int i = 0; i < size; ++i) {
-        sizeSum[i] = 0;
-        count[i] = 0;
-      }
-    }
-
-    public void addOperationSize(OperationType type, long size) {
-      if (size > 0) {
-        int index = type.ordinal();
-        sizeSum[index] += size;
-        count[index]++;
-      }
-    }
-
-    public long getAvgOperationSize(OperationType type) {
-      int index = type.ordinal();
-      return count[index] > 0 ? sizeSum[index] / count[index] : 0;
-    }
-
-    public long getOperationSize(OperationType type) {
-      return sizeSum[type.ordinal()];
-    }
-
-    public void addGetResult(final Result result) {
-      long size = QuotaUtil.calculateResultSize(result);
-      addOperationSize(OperationType.GET, size);
-    }
-
-    public void addScanResult(final List<Result> results) {
-      long size = QuotaUtil.calculateResultSize(results);
-      addOperationSize(OperationType.SCAN, size);
-    }
-
-    public void addMutation(final Mutation mutation) {
-      long size = QuotaUtil.calculateMutationSize(mutation);
-      addOperationSize(OperationType.MUTATE, size);
-    }
-  }
-
-  /**
    * Checks if it is possible to execute the specified operation.
    * The quota will be estimated based on the number of operations to perform
    * and the average size accumulated during time.
@@ -122,7 +72,4 @@ public interface OperationQuota {
 
   /** @return the number of bytes available to write to avoid exceeding the quota */
   long getWriteAvailable();
-
-  /** @return the average data size of the specified operation */
-  long getAvgOperationSize(OperationType type);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/341f049e/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
index ffacbc0..a27211c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
@@ -68,13 +68,4 @@ public interface QuotaLimiter {
 
   /** @return the number of bytes available to write to avoid exceeding the quota */
   long getWriteAvailable();
-
-  /**
-   * Add the average size of the specified operation type.
-   * The average will be used as estimate for the next operations.
-   */
-  void addOperationSize(OperationType type, long size);
-
-  /** @return the average data size of the specified operation */
-  long getAvgOperationSize(OperationType type);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/341f049e/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
index dc527b6..28fbf0d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
@@ -118,7 +118,15 @@ public abstract class RateLimiter {
   public synchronized void update(final RateLimiter other) {
     this.tunit = other.tunit;
     if (this.limit < other.limit) {
-      this.avail += (other.limit - this.limit);
+      // If avail is capped to this.limit, it will never overflow,
+      // otherwise, avail may overflow, just be careful here.
+      long diff = other.limit - this.limit;
+      if (this.avail <= Long.MAX_VALUE - diff) {
+        this.avail += diff;
+        this.avail = Math.min(this.avail, other.limit);
+      } else {
+        this.avail = other.limit;
+      }
     }
     this.limit = other.limit;
   }
@@ -149,10 +157,14 @@ public abstract class RateLimiter {
 
   /**
    * Are there enough available resources to allow execution?
-   * @param amount the number of required resources
+   * @param amount the number of required resources, a non-negative number
    * @return true if there are enough available resources, otherwise false
    */
   public synchronized boolean canExecute(final long amount) {
+    if (isBypass()) {
+      return true;
+    }
+
     long refillAmount = refill(limit);
     if (refillAmount == 0 && avail < amount) {
       return false;
@@ -177,13 +189,27 @@ public abstract class RateLimiter {
   }
 
   /**
-   * consume amount available units.
+   * consume amount available units, amount could be a negative number
    * @param amount the number of units to consume
    */
   public synchronized void consume(final long amount) {
-    this.avail -= amount;
-    if (this.avail < 0) {
-      this.avail = 0;
+
+    if (isBypass()) {
+      return;
+    }
+
+    if (amount >= 0 ) {
+      this.avail -= amount;
+      if (this.avail < 0) {
+        this.avail = 0;
+      }
+    } else {
+      if (this.avail <= Long.MAX_VALUE + amount) {
+        this.avail -= amount;
+        this.avail = Math.min(this.avail, this.limit);
+      } else {
+        this.avail = this.limit;
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/341f049e/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
index a12cbef..9bf6952 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
@@ -26,8 +26,6 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.TimedQuota;
-import org.apache.hadoop.hbase.quotas.OperationQuota.AvgOperationSize;
-import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType;
 
 /**
  * Simple time based limiter that checks the quota Throttle
@@ -42,7 +40,6 @@ public class TimeBasedLimiter implements QuotaLimiter {
   private RateLimiter writeSizeLimiter = null;
   private RateLimiter readReqsLimiter = null;
   private RateLimiter readSizeLimiter = null;
-  private AvgOperationSize avgOpSize = new AvgOperationSize();
 
   private TimeBasedLimiter() {
     if (FixedIntervalRateLimiter.class.getName().equals(
@@ -186,16 +183,6 @@ public class TimeBasedLimiter implements QuotaLimiter {
   }
 
   @Override
-  public void addOperationSize(OperationType type, long size) {
-    avgOpSize.addOperationSize(type, size);
-  }
-
-  @Override
-  public long getAvgOperationSize(OperationType type) {
-    return avgOpSize.getAvgOperationSize(type);
-  }
-
-  @Override
   public String toString() {
     StringBuilder builder = new StringBuilder();
     builder.append("TimeBasedLimiter(");

http://git-wip-us.apache.org/repos/asf/hbase/blob/341f049e/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java
index ba8f405..e205f9b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -285,4 +286,133 @@ public class TestRateLimiter {
     limiter.setNextRefillTime(limiter.getNextRefillTime() - 5000);
     assertEquals(60, limiter.refill(limiter.getLimit()));
   }
+
+  @Test
+  public void testUnconfiguredLimiters() throws InterruptedException {
+
+    ManualEnvironmentEdge testEdge = new ManualEnvironmentEdge();
+    EnvironmentEdgeManager.injectEdge(testEdge);
+    long limit = Long.MAX_VALUE;
+
+    // For unconfigured limiters, it is supposed to use as much as possible
+    RateLimiter avgLimiter = new AverageIntervalRateLimiter();
+    RateLimiter fixLimiter = new FixedIntervalRateLimiter();
+
+    assertEquals(limit, avgLimiter.getAvailable());
+    assertEquals(limit, fixLimiter.getAvailable());
+
+    assertTrue(avgLimiter.canExecute(limit));
+    avgLimiter.consume(limit);
+
+    assertTrue(fixLimiter.canExecute(limit));
+    fixLimiter.consume(limit);
+
+    // Make sure that available is Long.MAX_VALUE
+    assertTrue(limit == avgLimiter.getAvailable());
+    assertTrue(limit == fixLimiter.getAvailable());
+
+    // after 100 millseconds, it should be able to execute limit as well
+    testEdge.incValue(100);
+
+    assertTrue(avgLimiter.canExecute(limit));
+    avgLimiter.consume(limit);
+
+    assertTrue(fixLimiter.canExecute(limit));
+    fixLimiter.consume(limit);
+
+    // Make sure that available is Long.MAX_VALUE
+    assertTrue(limit == avgLimiter.getAvailable());
+    assertTrue(limit == fixLimiter.getAvailable());
+
+    EnvironmentEdgeManager.reset();
+  }
+
+  @Test
+  public void testExtremeLimiters() throws InterruptedException {
+
+    ManualEnvironmentEdge testEdge = new ManualEnvironmentEdge();
+    EnvironmentEdgeManager.injectEdge(testEdge);
+    long limit = Long.MAX_VALUE - 1;
+
+    RateLimiter avgLimiter = new AverageIntervalRateLimiter();
+    avgLimiter.set(limit, TimeUnit.SECONDS);
+    RateLimiter fixLimiter = new FixedIntervalRateLimiter();
+    fixLimiter.set(limit, TimeUnit.SECONDS);
+
+    assertEquals(limit, avgLimiter.getAvailable());
+    assertEquals(limit, fixLimiter.getAvailable());
+
+    assertTrue(avgLimiter.canExecute(limit / 2));
+    avgLimiter.consume(limit / 2);
+
+    assertTrue(fixLimiter.canExecute(limit / 2));
+    fixLimiter.consume(limit / 2);
+
+    // Make sure that available is whatever left
+    assertTrue((limit - (limit / 2)) == avgLimiter.getAvailable());
+    assertTrue((limit - (limit / 2)) == fixLimiter.getAvailable());
+
+    // after 100 millseconds, both should not be able to execute the limit
+    testEdge.incValue(100);
+
+    assertFalse(avgLimiter.canExecute(limit));
+    assertFalse(fixLimiter.canExecute(limit));
+
+    // after 500 millseconds, average interval limiter should be able to execute the limit
+    testEdge.incValue(500);
+    assertTrue(avgLimiter.canExecute(limit));
+    assertFalse(fixLimiter.canExecute(limit));
+
+    // Make sure that available is correct
+    assertTrue(limit == avgLimiter.getAvailable());
+    assertTrue((limit - (limit / 2)) == fixLimiter.getAvailable());
+
+    // after 500 millseconds, both should be able to execute
+    testEdge.incValue(500);
+    assertTrue(avgLimiter.canExecute(limit));
+    assertTrue(fixLimiter.canExecute(limit));
+
+    // Make sure that available is Long.MAX_VALUE
+    assertTrue(limit == avgLimiter.getAvailable());
+    assertTrue(limit == fixLimiter.getAvailable());
+
+    EnvironmentEdgeManager.reset();
+  }
+
+  /*
+   * This test case is tricky. Basically, it simulates the following events:
+   *           Thread-1                             Thread-2
+   * t0:  canExecute(100) and consume(100)
+   * t1:                                         canExecute(100), avail may be increased by 80
+   * t2:  consume(-80) as actual size is 20
+   * It will check if consume(-80) can handle overflow correctly.
+   */
+  @Test
+  public void testLimiterCompensationOverflow() throws InterruptedException {
+
+    long limit = Long.MAX_VALUE - 1;
+    long guessNumber = 100;
+
+    // For unconfigured limiters, it is supposed to use as much as possible
+    RateLimiter avgLimiter = new AverageIntervalRateLimiter();
+    avgLimiter.set(limit, TimeUnit.SECONDS);
+
+    assertEquals(limit, avgLimiter.getAvailable());
+
+    // The initial guess is that 100 bytes.
+    assertTrue(avgLimiter.canExecute(guessNumber));
+    avgLimiter.consume(guessNumber);
+
+    // Make sure that available is whatever left
+    assertTrue((limit - guessNumber) == avgLimiter.getAvailable());
+
+    // Manually set avil to simulate that another thread call canExecute().
+    // It is simulated by consume().
+    avgLimiter.consume(-80);
+    assertTrue((limit - guessNumber + 80) == avgLimiter.getAvailable());
+
+    // Now thread1 compensates 80
+    avgLimiter.consume(-80);
+    assertTrue(limit == avgLimiter.getAvailable());
+  }
 }


[3/5] hbase git commit: HBASE-16699 Overflows in AverageIntervalRateLimiter's refill() and getWaitInterval()

Posted by mb...@apache.org.
HBASE-16699 Overflows in AverageIntervalRateLimiter's refill() and getWaitInterval()

Signed-off-by: Matteo Bertozzi <ma...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/387a0865
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/387a0865
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/387a0865

Branch: refs/heads/branch-1.3
Commit: 387a08657c06e350e3d912e98486c8ae3a1fe025
Parents: 6a15986
Author: Huaxiang Sun <hs...@cloudera.com>
Authored: Mon Oct 10 14:12:03 2016 -0700
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Mon Oct 10 14:17:44 2016 -0700

----------------------------------------------------------------------
 .../quotas/AverageIntervalRateLimiter.java      |  20 ++-
 .../hbase/quotas/DefaultOperationQuota.java     |  47 +++----
 .../hadoop/hbase/quotas/NoopOperationQuota.java |   5 -
 .../hadoop/hbase/quotas/NoopQuotaLimiter.java   |   9 --
 .../hadoop/hbase/quotas/OperationQuota.java     |  59 +--------
 .../hadoop/hbase/quotas/QuotaLimiter.java       |   9 --
 .../apache/hadoop/hbase/quotas/RateLimiter.java |  38 +++++-
 .../hadoop/hbase/quotas/TimeBasedLimiter.java   |  13 --
 .../hadoop/hbase/quotas/TestRateLimiter.java    | 130 +++++++++++++++++++
 9 files changed, 196 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/387a0865/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java
index 75e6aea..9320d7c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java
@@ -34,12 +34,21 @@ public class AverageIntervalRateLimiter extends RateLimiter {
       return limit;
     }
 
-    long delta = (limit * (now - nextRefillTime)) / super.getTimeUnitInMillis();
+    long timeInterval = now - nextRefillTime;
+    long delta = 0;
+    long timeUnitInMillis = super.getTimeUnitInMillis();
+    if (timeInterval >= timeUnitInMillis) {
+      delta = limit;
+    } else if (timeInterval > 0) {
+      double r = ((double)timeInterval / (double)timeUnitInMillis) * limit;
+      delta = (long)r;
+    }
+
     if (delta > 0) {
       this.nextRefillTime = now;
-      return Math.min(limit, delta);
     }
-    return 0;
+
+    return delta;
   }
 
   @Override
@@ -47,8 +56,9 @@ public class AverageIntervalRateLimiter extends RateLimiter {
     if (nextRefillTime == -1) {
       return 0;
     }
-    long timeUnitInMillis = super.getTimeUnitInMillis();
-    return ((amount * timeUnitInMillis) / limit) - ((available * timeUnitInMillis) / limit);
+
+    double r = ((double)(amount - available)) * super.getTimeUnitInMillis() / limit;
+    return (long)r;
   }
 
   // This method is for strictly testing purpose only

http://git-wip-us.apache.org/repos/asf/hbase/blob/387a0865/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
index 654e8fa..6caac74 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
@@ -31,8 +31,7 @@ public class DefaultOperationQuota implements OperationQuota {
   private long readAvailable = 0;
   private long writeConsumed = 0;
   private long readConsumed = 0;
-
-  private AvgOperationSize avgOpSize = new AvgOperationSize();
+  private final long[] operationSize;
 
   public DefaultOperationQuota(final QuotaLimiter... limiters) {
     this(Arrays.asList(limiters));
@@ -43,6 +42,12 @@ public class DefaultOperationQuota implements OperationQuota {
    */
   public DefaultOperationQuota(final List<QuotaLimiter> limiters) {
     this.limiters = limiters;
+    int size = OperationType.values().length;
+    operationSize = new long[size];
+
+    for (int i = 0; i < size; ++i) {
+      operationSize[i] = 0;
+    }
   }
 
   @Override
@@ -68,22 +73,12 @@ public class DefaultOperationQuota implements OperationQuota {
 
   @Override
   public void close() {
-    // Calculate and set the average size of get, scan and mutate for the current operation
-    long getSize = avgOpSize.getAvgOperationSize(OperationType.GET);
-    long scanSize = avgOpSize.getAvgOperationSize(OperationType.SCAN);
-    long mutationSize = avgOpSize.getAvgOperationSize(OperationType.MUTATE);
-    for (final QuotaLimiter limiter : limiters) {
-      limiter.addOperationSize(OperationType.GET, getSize);
-      limiter.addOperationSize(OperationType.SCAN, scanSize);
-      limiter.addOperationSize(OperationType.MUTATE, mutationSize);
-    }
-
     // Adjust the quota consumed for the specified operation
-    long writeDiff = avgOpSize.getOperationSize(OperationType.MUTATE) - writeConsumed;
-    long readDiff =
-        (avgOpSize.getOperationSize(OperationType.GET) + avgOpSize
-            .getOperationSize(OperationType.SCAN)) - readConsumed;
-    for (final QuotaLimiter limiter : limiters) {
+    long writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed;
+    long readDiff = operationSize[OperationType.GET.ordinal()] +
+        operationSize[OperationType.SCAN.ordinal()] - readConsumed;
+
+    for (final QuotaLimiter limiter: limiters) {
       if (writeDiff != 0) limiter.consumeWrite(writeDiff);
       if (readDiff != 0) limiter.consumeRead(readDiff);
     }
@@ -101,33 +96,21 @@ public class DefaultOperationQuota implements OperationQuota {
 
   @Override
   public void addGetResult(final Result result) {
-    avgOpSize.addGetResult(result);
+    operationSize[OperationType.GET.ordinal()] += QuotaUtil.calculateResultSize(result);
   }
 
   @Override
   public void addScanResult(final List<Result> results) {
-    avgOpSize.addScanResult(results);
+    operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateResultSize(results);
   }
 
   @Override
   public void addMutation(final Mutation mutation) {
-    avgOpSize.addMutation(mutation);
-  }
-
-  @Override
-  public long getAvgOperationSize(OperationType type) {
-    return avgOpSize.getAvgOperationSize(type);
+    operationSize[OperationType.MUTATE.ordinal()] += QuotaUtil.calculateMutationSize(mutation);
   }
 
   private long estimateConsume(final OperationType type, int numReqs, long avgSize) {
     if (numReqs > 0) {
-      for (final QuotaLimiter limiter : limiters) {
-        long size = limiter.getAvgOperationSize(type);
-        if (size > 0) {
-          avgSize = size;
-          break;
-        }
-      }
       return avgSize * numReqs;
     }
     return 0;

http://git-wip-us.apache.org/repos/asf/hbase/blob/387a0865/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
index 2463ef7..9d37141 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
@@ -68,9 +68,4 @@ final class NoopOperationQuota implements OperationQuota {
   public long getWriteAvailable() {
     return Long.MAX_VALUE;
   }
-
-  @Override
-  public long getAvgOperationSize(OperationType type) {
-    return -1;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/387a0865/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
index 699fd1a..e72f3d2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
@@ -63,15 +63,6 @@ final class NoopQuotaLimiter implements QuotaLimiter {
   }
 
   @Override
-  public void addOperationSize(OperationType type, long size) {
-  }
-
-  @Override
-  public long getAvgOperationSize(OperationType type) {
-    return -1;
-  }
-
-  @Override
   public String toString() {
     return "NoopQuotaLimiter";
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/387a0865/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
index 6010c13..ee38256 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
@@ -29,58 +29,10 @@ public interface OperationQuota {
   }
 
   /**
-   * Keeps track of the average data size of operations like get, scan, mutate
-   */
-  public class AvgOperationSize {
-    private final long[] sizeSum;
-    private final long[] count;
-
-    public AvgOperationSize() {
-      int size = OperationType.values().length;
-      sizeSum = new long[size];
-      count = new long[size];
-      for (int i = 0; i < size; ++i) {
-        sizeSum[i] = 0;
-        count[i] = 0;
-      }
-    }
-
-    public void addOperationSize(OperationType type, long size) {
-      if (size > 0) {
-        int index = type.ordinal();
-        sizeSum[index] += size;
-        count[index]++;
-      }
-    }
-
-    public long getAvgOperationSize(OperationType type) {
-      int index = type.ordinal();
-      return count[index] > 0 ? sizeSum[index] / count[index] : 0;
-    }
-
-    public long getOperationSize(OperationType type) {
-      return sizeSum[type.ordinal()];
-    }
-
-    public void addGetResult(final Result result) {
-      long size = QuotaUtil.calculateResultSize(result);
-      addOperationSize(OperationType.GET, size);
-    }
-
-    public void addScanResult(final List<Result> results) {
-      long size = QuotaUtil.calculateResultSize(results);
-      addOperationSize(OperationType.SCAN, size);
-    }
-
-    public void addMutation(final Mutation mutation) {
-      long size = QuotaUtil.calculateMutationSize(mutation);
-      addOperationSize(OperationType.MUTATE, size);
-    }
-  }
-
-  /**
-   * Checks if it is possible to execute the specified operation. The quota will be estimated based
-   * on the number of operations to perform and the average size accumulated during time.
+   * Checks if it is possible to execute the specified operation.
+   * The quota will be estimated based on the number of operations to perform
+   * and the average size accumulated during time.
+   *
    * @param numWrites number of write operation that will be performed
    * @param numReads number of small-read operation that will be performed
    * @param numScans number of long-read operation that will be performed
@@ -114,7 +66,4 @@ public interface OperationQuota {
 
   /** @return the number of bytes available to write to avoid exceeding the quota */
   long getWriteAvailable();
-
-  /** @return the average data size of the specified operation */
-  long getAvgOperationSize(OperationType type);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/387a0865/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
index ffacbc0..a27211c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
@@ -68,13 +68,4 @@ public interface QuotaLimiter {
 
   /** @return the number of bytes available to write to avoid exceeding the quota */
   long getWriteAvailable();
-
-  /**
-   * Add the average size of the specified operation type.
-   * The average will be used as estimate for the next operations.
-   */
-  void addOperationSize(OperationType type, long size);
-
-  /** @return the average data size of the specified operation */
-  long getAvgOperationSize(OperationType type);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/387a0865/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
index 3b407bd..356afd3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
@@ -111,7 +111,15 @@ public abstract class RateLimiter {
   public synchronized void update(final RateLimiter other) {
     this.tunit = other.tunit;
     if (this.limit < other.limit) {
-      this.avail += (other.limit - this.limit);
+      // If avail is capped to this.limit, it will never overflow,
+      // otherwise, avail may overflow, just be careful here.
+      long diff = other.limit - this.limit;
+      if (this.avail <= Long.MAX_VALUE - diff) {
+        this.avail += diff;
+        this.avail = Math.min(this.avail, other.limit);
+      } else {
+        this.avail = other.limit;
+      }
     }
     this.limit = other.limit;
   }
@@ -142,10 +150,14 @@ public abstract class RateLimiter {
 
   /**
    * Are there enough available resources to allow execution?
-   * @param amount the number of required resources
+   * @param amount the number of required resources, a non-negative number
    * @return true if there are enough available resources, otherwise false
    */
   public synchronized boolean canExecute(final long amount) {
+    if (isBypass()) {
+      return true;
+    }
+
     long refillAmount = refill(limit);
     if (refillAmount == 0 && avail < amount) {
       return false;
@@ -170,13 +182,27 @@ public abstract class RateLimiter {
   }
 
   /**
-   * consume amount available units.
+   * consume amount available units, amount could be a negative number
    * @param amount the number of units to consume
    */
   public synchronized void consume(final long amount) {
-    this.avail -= amount;
-    if (this.avail < 0) {
-      this.avail = 0;
+
+    if (isBypass()) {
+      return;
+    }
+
+    if (amount >= 0 ) {
+      this.avail -= amount;
+      if (this.avail < 0) {
+        this.avail = 0;
+      }
+    } else {
+      if (this.avail <= Long.MAX_VALUE + amount) {
+        this.avail -= amount;
+        this.avail = Math.min(this.avail, this.limit);
+      } else {
+        this.avail = this.limit;
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/387a0865/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
index beb4147..1563878 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
@@ -26,8 +26,6 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Throttle;
 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota;
-import org.apache.hadoop.hbase.quotas.OperationQuota.AvgOperationSize;
-import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType;
 
 /**
  * Simple time based limiter that checks the quota Throttle
@@ -42,7 +40,6 @@ public class TimeBasedLimiter implements QuotaLimiter {
   private RateLimiter writeSizeLimiter = null;
   private RateLimiter readReqsLimiter = null;
   private RateLimiter readSizeLimiter = null;
-  private AvgOperationSize avgOpSize = new AvgOperationSize();
 
   private TimeBasedLimiter() {
     if (FixedIntervalRateLimiter.class.getName().equals(
@@ -186,16 +183,6 @@ public class TimeBasedLimiter implements QuotaLimiter {
   }
 
   @Override
-  public void addOperationSize(OperationType type, long size) {
-    avgOpSize.addOperationSize(type, size);
-  }
-
-  @Override
-  public long getAvgOperationSize(OperationType type) {
-    return avgOpSize.getAvgOperationSize(type);
-  }
-
-  @Override
   public String toString() {
     StringBuilder builder = new StringBuilder();
     builder.append("TimeBasedLimiter(");

http://git-wip-us.apache.org/repos/asf/hbase/blob/387a0865/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java
index 4c5a658..1ca6643 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java
@@ -19,6 +19,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -277,4 +278,133 @@ public class TestRateLimiter {
     limiter.setNextRefillTime(limiter.getNextRefillTime() - 5000);
     assertEquals(60, limiter.refill(limiter.getLimit()));
   }
+
+  @Test
+  public void testUnconfiguredLimiters() throws InterruptedException {
+
+    ManualEnvironmentEdge testEdge = new ManualEnvironmentEdge();
+    EnvironmentEdgeManager.injectEdge(testEdge);
+    long limit = Long.MAX_VALUE;
+
+    // For unconfigured limiters, it is supposed to use as much as possible
+    RateLimiter avgLimiter = new AverageIntervalRateLimiter();
+    RateLimiter fixLimiter = new FixedIntervalRateLimiter();
+
+    assertEquals(limit, avgLimiter.getAvailable());
+    assertEquals(limit, fixLimiter.getAvailable());
+
+    assertTrue(avgLimiter.canExecute(limit));
+    avgLimiter.consume(limit);
+
+    assertTrue(fixLimiter.canExecute(limit));
+    fixLimiter.consume(limit);
+
+    // Make sure that available is Long.MAX_VALUE
+    assertTrue(limit == avgLimiter.getAvailable());
+    assertTrue(limit == fixLimiter.getAvailable());
+
+    // after 100 millseconds, it should be able to execute limit as well
+    testEdge.incValue(100);
+
+    assertTrue(avgLimiter.canExecute(limit));
+    avgLimiter.consume(limit);
+
+    assertTrue(fixLimiter.canExecute(limit));
+    fixLimiter.consume(limit);
+
+    // Make sure that available is Long.MAX_VALUE
+    assertTrue(limit == avgLimiter.getAvailable());
+    assertTrue(limit == fixLimiter.getAvailable());
+
+    EnvironmentEdgeManager.reset();
+  }
+
+  @Test
+  public void testExtremeLimiters() throws InterruptedException {
+
+    ManualEnvironmentEdge testEdge = new ManualEnvironmentEdge();
+    EnvironmentEdgeManager.injectEdge(testEdge);
+    long limit = Long.MAX_VALUE - 1;
+
+    RateLimiter avgLimiter = new AverageIntervalRateLimiter();
+    avgLimiter.set(limit, TimeUnit.SECONDS);
+    RateLimiter fixLimiter = new FixedIntervalRateLimiter();
+    fixLimiter.set(limit, TimeUnit.SECONDS);
+
+    assertEquals(limit, avgLimiter.getAvailable());
+    assertEquals(limit, fixLimiter.getAvailable());
+
+    assertTrue(avgLimiter.canExecute(limit / 2));
+    avgLimiter.consume(limit / 2);
+
+    assertTrue(fixLimiter.canExecute(limit / 2));
+    fixLimiter.consume(limit / 2);
+
+    // Make sure that available is whatever left
+    assertTrue((limit - (limit / 2)) == avgLimiter.getAvailable());
+    assertTrue((limit - (limit / 2)) == fixLimiter.getAvailable());
+
+    // after 100 millseconds, both should not be able to execute the limit
+    testEdge.incValue(100);
+
+    assertFalse(avgLimiter.canExecute(limit));
+    assertFalse(fixLimiter.canExecute(limit));
+
+    // after 500 millseconds, average interval limiter should be able to execute the limit
+    testEdge.incValue(500);
+    assertTrue(avgLimiter.canExecute(limit));
+    assertFalse(fixLimiter.canExecute(limit));
+
+    // Make sure that available is correct
+    assertTrue(limit == avgLimiter.getAvailable());
+    assertTrue((limit - (limit / 2)) == fixLimiter.getAvailable());
+
+    // after 500 millseconds, both should be able to execute
+    testEdge.incValue(500);
+    assertTrue(avgLimiter.canExecute(limit));
+    assertTrue(fixLimiter.canExecute(limit));
+
+    // Make sure that available is Long.MAX_VALUE
+    assertTrue(limit == avgLimiter.getAvailable());
+    assertTrue(limit == fixLimiter.getAvailable());
+
+    EnvironmentEdgeManager.reset();
+  }
+
+  /*
+   * This test case is tricky. Basically, it simulates the following events:
+   *           Thread-1                             Thread-2
+   * t0:  canExecute(100) and consume(100)
+   * t1:                                         canExecute(100), avail may be increased by 80
+   * t2:  consume(-80) as actual size is 20
+   * It will check if consume(-80) can handle overflow correctly.
+   */
+  @Test
+  public void testLimiterCompensationOverflow() throws InterruptedException {
+
+    long limit = Long.MAX_VALUE - 1;
+    long guessNumber = 100;
+
+    // For unconfigured limiters, it is supposed to use as much as possible
+    RateLimiter avgLimiter = new AverageIntervalRateLimiter();
+    avgLimiter.set(limit, TimeUnit.SECONDS);
+
+    assertEquals(limit, avgLimiter.getAvailable());
+
+    // The initial guess is that 100 bytes.
+    assertTrue(avgLimiter.canExecute(guessNumber));
+    avgLimiter.consume(guessNumber);
+
+    // Make sure that available is whatever left
+    assertTrue((limit - guessNumber) == avgLimiter.getAvailable());
+
+    // Manually set avil to simulate that another thread call canExecute().
+    // It is simulated by consume().
+    avgLimiter.consume(-80);
+    assertTrue((limit - guessNumber + 80) == avgLimiter.getAvailable());
+
+    // Now thread1 compensates 80
+    avgLimiter.consume(-80);
+    assertTrue(limit == avgLimiter.getAvailable());
+  }
 }


[5/5] hbase git commit: HBASE-16699 Overflows in AverageIntervalRateLimiter's refill() and getWaitInterval()

Posted by mb...@apache.org.
HBASE-16699 Overflows in AverageIntervalRateLimiter's refill() and getWaitInterval()

Signed-off-by: Matteo Bertozzi <ma...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/47c8659d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/47c8659d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/47c8659d

Branch: refs/heads/branch-1.1
Commit: 47c8659d2ac6fd5bb715e30e317aa44ee574d8b0
Parents: 9620dc4
Author: Huaxiang Sun <hs...@cloudera.com>
Authored: Mon Oct 10 14:12:03 2016 -0700
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Mon Oct 10 14:29:11 2016 -0700

----------------------------------------------------------------------
 .../quotas/AverageIntervalRateLimiter.java      |  20 ++-
 .../hbase/quotas/DefaultOperationQuota.java     |  47 +++----
 .../hadoop/hbase/quotas/NoopOperationQuota.java |   5 -
 .../hadoop/hbase/quotas/NoopQuotaLimiter.java   |   9 --
 .../hadoop/hbase/quotas/OperationQuota.java     |  59 +--------
 .../hadoop/hbase/quotas/QuotaLimiter.java       |   9 --
 .../apache/hadoop/hbase/quotas/RateLimiter.java |  38 +++++-
 .../hadoop/hbase/quotas/TimeBasedLimiter.java   |  13 --
 .../hadoop/hbase/quotas/TestRateLimiter.java    | 130 +++++++++++++++++++
 9 files changed, 196 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/47c8659d/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java
index 75e6aea..9320d7c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java
@@ -34,12 +34,21 @@ public class AverageIntervalRateLimiter extends RateLimiter {
       return limit;
     }
 
-    long delta = (limit * (now - nextRefillTime)) / super.getTimeUnitInMillis();
+    long timeInterval = now - nextRefillTime;
+    long delta = 0;
+    long timeUnitInMillis = super.getTimeUnitInMillis();
+    if (timeInterval >= timeUnitInMillis) {
+      delta = limit;
+    } else if (timeInterval > 0) {
+      double r = ((double)timeInterval / (double)timeUnitInMillis) * limit;
+      delta = (long)r;
+    }
+
     if (delta > 0) {
       this.nextRefillTime = now;
-      return Math.min(limit, delta);
     }
-    return 0;
+
+    return delta;
   }
 
   @Override
@@ -47,8 +56,9 @@ public class AverageIntervalRateLimiter extends RateLimiter {
     if (nextRefillTime == -1) {
       return 0;
     }
-    long timeUnitInMillis = super.getTimeUnitInMillis();
-    return ((amount * timeUnitInMillis) / limit) - ((available * timeUnitInMillis) / limit);
+
+    double r = ((double)(amount - available)) * super.getTimeUnitInMillis() / limit;
+    return (long)r;
   }
 
   // This method is for strictly testing purpose only

http://git-wip-us.apache.org/repos/asf/hbase/blob/47c8659d/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
index 654e8fa..6caac74 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
@@ -31,8 +31,7 @@ public class DefaultOperationQuota implements OperationQuota {
   private long readAvailable = 0;
   private long writeConsumed = 0;
   private long readConsumed = 0;
-
-  private AvgOperationSize avgOpSize = new AvgOperationSize();
+  private final long[] operationSize;
 
   public DefaultOperationQuota(final QuotaLimiter... limiters) {
     this(Arrays.asList(limiters));
@@ -43,6 +42,12 @@ public class DefaultOperationQuota implements OperationQuota {
    */
   public DefaultOperationQuota(final List<QuotaLimiter> limiters) {
     this.limiters = limiters;
+    int size = OperationType.values().length;
+    operationSize = new long[size];
+
+    for (int i = 0; i < size; ++i) {
+      operationSize[i] = 0;
+    }
   }
 
   @Override
@@ -68,22 +73,12 @@ public class DefaultOperationQuota implements OperationQuota {
 
   @Override
   public void close() {
-    // Calculate and set the average size of get, scan and mutate for the current operation
-    long getSize = avgOpSize.getAvgOperationSize(OperationType.GET);
-    long scanSize = avgOpSize.getAvgOperationSize(OperationType.SCAN);
-    long mutationSize = avgOpSize.getAvgOperationSize(OperationType.MUTATE);
-    for (final QuotaLimiter limiter : limiters) {
-      limiter.addOperationSize(OperationType.GET, getSize);
-      limiter.addOperationSize(OperationType.SCAN, scanSize);
-      limiter.addOperationSize(OperationType.MUTATE, mutationSize);
-    }
-
     // Adjust the quota consumed for the specified operation
-    long writeDiff = avgOpSize.getOperationSize(OperationType.MUTATE) - writeConsumed;
-    long readDiff =
-        (avgOpSize.getOperationSize(OperationType.GET) + avgOpSize
-            .getOperationSize(OperationType.SCAN)) - readConsumed;
-    for (final QuotaLimiter limiter : limiters) {
+    long writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed;
+    long readDiff = operationSize[OperationType.GET.ordinal()] +
+        operationSize[OperationType.SCAN.ordinal()] - readConsumed;
+
+    for (final QuotaLimiter limiter: limiters) {
       if (writeDiff != 0) limiter.consumeWrite(writeDiff);
       if (readDiff != 0) limiter.consumeRead(readDiff);
     }
@@ -101,33 +96,21 @@ public class DefaultOperationQuota implements OperationQuota {
 
   @Override
   public void addGetResult(final Result result) {
-    avgOpSize.addGetResult(result);
+    operationSize[OperationType.GET.ordinal()] += QuotaUtil.calculateResultSize(result);
   }
 
   @Override
   public void addScanResult(final List<Result> results) {
-    avgOpSize.addScanResult(results);
+    operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateResultSize(results);
   }
 
   @Override
   public void addMutation(final Mutation mutation) {
-    avgOpSize.addMutation(mutation);
-  }
-
-  @Override
-  public long getAvgOperationSize(OperationType type) {
-    return avgOpSize.getAvgOperationSize(type);
+    operationSize[OperationType.MUTATE.ordinal()] += QuotaUtil.calculateMutationSize(mutation);
   }
 
   private long estimateConsume(final OperationType type, int numReqs, long avgSize) {
     if (numReqs > 0) {
-      for (final QuotaLimiter limiter : limiters) {
-        long size = limiter.getAvgOperationSize(type);
-        if (size > 0) {
-          avgSize = size;
-          break;
-        }
-      }
       return avgSize * numReqs;
     }
     return 0;

http://git-wip-us.apache.org/repos/asf/hbase/blob/47c8659d/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
index 2463ef7..9d37141 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
@@ -68,9 +68,4 @@ final class NoopOperationQuota implements OperationQuota {
   public long getWriteAvailable() {
     return Long.MAX_VALUE;
   }
-
-  @Override
-  public long getAvgOperationSize(OperationType type) {
-    return -1;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/47c8659d/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
index 699fd1a..e72f3d2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
@@ -63,15 +63,6 @@ final class NoopQuotaLimiter implements QuotaLimiter {
   }
 
   @Override
-  public void addOperationSize(OperationType type, long size) {
-  }
-
-  @Override
-  public long getAvgOperationSize(OperationType type) {
-    return -1;
-  }
-
-  @Override
   public String toString() {
     return "NoopQuotaLimiter";
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/47c8659d/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
index 6010c13..ee38256 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
@@ -29,58 +29,10 @@ public interface OperationQuota {
   }
 
   /**
-   * Keeps track of the average data size of operations like get, scan, mutate
-   */
-  public class AvgOperationSize {
-    private final long[] sizeSum;
-    private final long[] count;
-
-    public AvgOperationSize() {
-      int size = OperationType.values().length;
-      sizeSum = new long[size];
-      count = new long[size];
-      for (int i = 0; i < size; ++i) {
-        sizeSum[i] = 0;
-        count[i] = 0;
-      }
-    }
-
-    public void addOperationSize(OperationType type, long size) {
-      if (size > 0) {
-        int index = type.ordinal();
-        sizeSum[index] += size;
-        count[index]++;
-      }
-    }
-
-    public long getAvgOperationSize(OperationType type) {
-      int index = type.ordinal();
-      return count[index] > 0 ? sizeSum[index] / count[index] : 0;
-    }
-
-    public long getOperationSize(OperationType type) {
-      return sizeSum[type.ordinal()];
-    }
-
-    public void addGetResult(final Result result) {
-      long size = QuotaUtil.calculateResultSize(result);
-      addOperationSize(OperationType.GET, size);
-    }
-
-    public void addScanResult(final List<Result> results) {
-      long size = QuotaUtil.calculateResultSize(results);
-      addOperationSize(OperationType.SCAN, size);
-    }
-
-    public void addMutation(final Mutation mutation) {
-      long size = QuotaUtil.calculateMutationSize(mutation);
-      addOperationSize(OperationType.MUTATE, size);
-    }
-  }
-
-  /**
-   * Checks if it is possible to execute the specified operation. The quota will be estimated based
-   * on the number of operations to perform and the average size accumulated during time.
+   * Checks if it is possible to execute the specified operation.
+   * The quota will be estimated based on the number of operations to perform
+   * and the average size accumulated during time.
+   *
    * @param numWrites number of write operation that will be performed
    * @param numReads number of small-read operation that will be performed
    * @param numScans number of long-read operation that will be performed
@@ -114,7 +66,4 @@ public interface OperationQuota {
 
   /** @return the number of bytes available to write to avoid exceeding the quota */
   long getWriteAvailable();
-
-  /** @return the average data size of the specified operation */
-  long getAvgOperationSize(OperationType type);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/47c8659d/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
index ffacbc0..a27211c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
@@ -68,13 +68,4 @@ public interface QuotaLimiter {
 
   /** @return the number of bytes available to write to avoid exceeding the quota */
   long getWriteAvailable();
-
-  /**
-   * Add the average size of the specified operation type.
-   * The average will be used as estimate for the next operations.
-   */
-  void addOperationSize(OperationType type, long size);
-
-  /** @return the average data size of the specified operation */
-  long getAvgOperationSize(OperationType type);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/47c8659d/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
index 45089e8..c9c93d3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
@@ -107,7 +107,15 @@ public abstract class RateLimiter {
   public synchronized void update(final RateLimiter other) {
     this.tunit = other.tunit;
     if (this.limit < other.limit) {
-      this.avail += (other.limit - this.limit);
+      // If avail is capped to this.limit, it will never overflow,
+      // otherwise, avail may overflow, just be careful here.
+      long diff = other.limit - this.limit;
+      if (this.avail <= Long.MAX_VALUE - diff) {
+        this.avail += diff;
+        this.avail = Math.min(this.avail, other.limit);
+      } else {
+        this.avail = other.limit;
+      }
     }
     this.limit = other.limit;
   }
@@ -138,10 +146,14 @@ public abstract class RateLimiter {
 
   /**
    * Are there enough available resources to allow execution?
-   * @param amount the number of required resources
+   * @param amount the number of required resources, a non-negative number
    * @return true if there are enough available resources, otherwise false
    */
   public synchronized boolean canExecute(final long amount) {
+    if (isBypass()) {
+      return true;
+    }
+
     long refillAmount = refill(limit);
     if (refillAmount == 0 && avail < amount) {
       return false;
@@ -166,13 +178,27 @@ public abstract class RateLimiter {
   }
 
   /**
-   * consume amount available units.
+   * consume amount available units, amount could be a negative number
    * @param amount the number of units to consume
    */
   public synchronized void consume(final long amount) {
-    this.avail -= amount;
-    if (this.avail < 0) {
-      this.avail = 0;
+
+    if (isBypass()) {
+      return;
+    }
+
+    if (amount >= 0 ) {
+      this.avail -= amount;
+      if (this.avail < 0) {
+        this.avail = 0;
+      }
+    } else {
+      if (this.avail <= Long.MAX_VALUE + amount) {
+        this.avail -= amount;
+        this.avail = Math.min(this.avail, this.limit);
+      } else {
+        this.avail = this.limit;
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/47c8659d/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
index beb4147..1563878 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
@@ -26,8 +26,6 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Throttle;
 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota;
-import org.apache.hadoop.hbase.quotas.OperationQuota.AvgOperationSize;
-import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType;
 
 /**
  * Simple time based limiter that checks the quota Throttle
@@ -42,7 +40,6 @@ public class TimeBasedLimiter implements QuotaLimiter {
   private RateLimiter writeSizeLimiter = null;
   private RateLimiter readReqsLimiter = null;
   private RateLimiter readSizeLimiter = null;
-  private AvgOperationSize avgOpSize = new AvgOperationSize();
 
   private TimeBasedLimiter() {
     if (FixedIntervalRateLimiter.class.getName().equals(
@@ -186,16 +183,6 @@ public class TimeBasedLimiter implements QuotaLimiter {
   }
 
   @Override
-  public void addOperationSize(OperationType type, long size) {
-    avgOpSize.addOperationSize(type, size);
-  }
-
-  @Override
-  public long getAvgOperationSize(OperationType type) {
-    return avgOpSize.getAvgOperationSize(type);
-  }
-
-  @Override
   public String toString() {
     StringBuilder builder = new StringBuilder();
     builder.append("TimeBasedLimiter(");

http://git-wip-us.apache.org/repos/asf/hbase/blob/47c8659d/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java
index 4c5a658..1ca6643 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java
@@ -19,6 +19,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -277,4 +278,133 @@ public class TestRateLimiter {
     limiter.setNextRefillTime(limiter.getNextRefillTime() - 5000);
     assertEquals(60, limiter.refill(limiter.getLimit()));
   }
+
+  @Test
+  public void testUnconfiguredLimiters() throws InterruptedException {
+
+    ManualEnvironmentEdge testEdge = new ManualEnvironmentEdge();
+    EnvironmentEdgeManager.injectEdge(testEdge);
+    long limit = Long.MAX_VALUE;
+
+    // For unconfigured limiters, it is supposed to use as much as possible
+    RateLimiter avgLimiter = new AverageIntervalRateLimiter();
+    RateLimiter fixLimiter = new FixedIntervalRateLimiter();
+
+    assertEquals(limit, avgLimiter.getAvailable());
+    assertEquals(limit, fixLimiter.getAvailable());
+
+    assertTrue(avgLimiter.canExecute(limit));
+    avgLimiter.consume(limit);
+
+    assertTrue(fixLimiter.canExecute(limit));
+    fixLimiter.consume(limit);
+
+    // Make sure that available is Long.MAX_VALUE
+    assertTrue(limit == avgLimiter.getAvailable());
+    assertTrue(limit == fixLimiter.getAvailable());
+
+    // after 100 millseconds, it should be able to execute limit as well
+    testEdge.incValue(100);
+
+    assertTrue(avgLimiter.canExecute(limit));
+    avgLimiter.consume(limit);
+
+    assertTrue(fixLimiter.canExecute(limit));
+    fixLimiter.consume(limit);
+
+    // Make sure that available is Long.MAX_VALUE
+    assertTrue(limit == avgLimiter.getAvailable());
+    assertTrue(limit == fixLimiter.getAvailable());
+
+    EnvironmentEdgeManager.reset();
+  }
+
+  @Test
+  public void testExtremeLimiters() throws InterruptedException {
+
+    ManualEnvironmentEdge testEdge = new ManualEnvironmentEdge();
+    EnvironmentEdgeManager.injectEdge(testEdge);
+    long limit = Long.MAX_VALUE - 1;
+
+    RateLimiter avgLimiter = new AverageIntervalRateLimiter();
+    avgLimiter.set(limit, TimeUnit.SECONDS);
+    RateLimiter fixLimiter = new FixedIntervalRateLimiter();
+    fixLimiter.set(limit, TimeUnit.SECONDS);
+
+    assertEquals(limit, avgLimiter.getAvailable());
+    assertEquals(limit, fixLimiter.getAvailable());
+
+    assertTrue(avgLimiter.canExecute(limit / 2));
+    avgLimiter.consume(limit / 2);
+
+    assertTrue(fixLimiter.canExecute(limit / 2));
+    fixLimiter.consume(limit / 2);
+
+    // Make sure that available is whatever left
+    assertTrue((limit - (limit / 2)) == avgLimiter.getAvailable());
+    assertTrue((limit - (limit / 2)) == fixLimiter.getAvailable());
+
+    // after 100 millseconds, both should not be able to execute the limit
+    testEdge.incValue(100);
+
+    assertFalse(avgLimiter.canExecute(limit));
+    assertFalse(fixLimiter.canExecute(limit));
+
+    // after 500 millseconds, average interval limiter should be able to execute the limit
+    testEdge.incValue(500);
+    assertTrue(avgLimiter.canExecute(limit));
+    assertFalse(fixLimiter.canExecute(limit));
+
+    // Make sure that available is correct
+    assertTrue(limit == avgLimiter.getAvailable());
+    assertTrue((limit - (limit / 2)) == fixLimiter.getAvailable());
+
+    // after 500 millseconds, both should be able to execute
+    testEdge.incValue(500);
+    assertTrue(avgLimiter.canExecute(limit));
+    assertTrue(fixLimiter.canExecute(limit));
+
+    // Make sure that available is Long.MAX_VALUE
+    assertTrue(limit == avgLimiter.getAvailable());
+    assertTrue(limit == fixLimiter.getAvailable());
+
+    EnvironmentEdgeManager.reset();
+  }
+
+  /*
+   * This test case is tricky. Basically, it simulates the following events:
+   *           Thread-1                             Thread-2
+   * t0:  canExecute(100) and consume(100)
+   * t1:                                         canExecute(100), avail may be increased by 80
+   * t2:  consume(-80) as actual size is 20
+   * It will check if consume(-80) can handle overflow correctly.
+   */
+  @Test
+  public void testLimiterCompensationOverflow() throws InterruptedException {
+
+    long limit = Long.MAX_VALUE - 1;
+    long guessNumber = 100;
+
+    // For unconfigured limiters, it is supposed to use as much as possible
+    RateLimiter avgLimiter = new AverageIntervalRateLimiter();
+    avgLimiter.set(limit, TimeUnit.SECONDS);
+
+    assertEquals(limit, avgLimiter.getAvailable());
+
+    // The initial guess is that 100 bytes.
+    assertTrue(avgLimiter.canExecute(guessNumber));
+    avgLimiter.consume(guessNumber);
+
+    // Make sure that available is whatever left
+    assertTrue((limit - guessNumber) == avgLimiter.getAvailable());
+
+    // Manually set avil to simulate that another thread call canExecute().
+    // It is simulated by consume().
+    avgLimiter.consume(-80);
+    assertTrue((limit - guessNumber + 80) == avgLimiter.getAvailable());
+
+    // Now thread1 compensates 80
+    avgLimiter.consume(-80);
+    assertTrue(limit == avgLimiter.getAvailable());
+  }
 }


[4/5] hbase git commit: HBASE-16699 Overflows in AverageIntervalRateLimiter's refill() and getWaitInterval()

Posted by mb...@apache.org.
HBASE-16699 Overflows in AverageIntervalRateLimiter's refill() and getWaitInterval()

Signed-off-by: Matteo Bertozzi <ma...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/55e98909
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/55e98909
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/55e98909

Branch: refs/heads/branch-1.2
Commit: 55e989090df034110dfdde4b7803c07ff92b8321
Parents: bd38f8d
Author: Huaxiang Sun <hs...@cloudera.com>
Authored: Mon Oct 10 14:12:03 2016 -0700
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Mon Oct 10 14:22:18 2016 -0700

----------------------------------------------------------------------
 .../quotas/AverageIntervalRateLimiter.java      |  20 ++-
 .../hbase/quotas/DefaultOperationQuota.java     |  47 +++----
 .../hadoop/hbase/quotas/NoopOperationQuota.java |   5 -
 .../hadoop/hbase/quotas/NoopQuotaLimiter.java   |   9 --
 .../hadoop/hbase/quotas/OperationQuota.java     |  59 +--------
 .../hadoop/hbase/quotas/QuotaLimiter.java       |   9 --
 .../apache/hadoop/hbase/quotas/RateLimiter.java |  38 +++++-
 .../hadoop/hbase/quotas/TimeBasedLimiter.java   |  13 --
 .../hadoop/hbase/quotas/TestRateLimiter.java    | 130 +++++++++++++++++++
 9 files changed, 196 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/55e98909/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java
index 75e6aea..9320d7c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java
@@ -34,12 +34,21 @@ public class AverageIntervalRateLimiter extends RateLimiter {
       return limit;
     }
 
-    long delta = (limit * (now - nextRefillTime)) / super.getTimeUnitInMillis();
+    long timeInterval = now - nextRefillTime;
+    long delta = 0;
+    long timeUnitInMillis = super.getTimeUnitInMillis();
+    if (timeInterval >= timeUnitInMillis) {
+      delta = limit;
+    } else if (timeInterval > 0) {
+      double r = ((double)timeInterval / (double)timeUnitInMillis) * limit;
+      delta = (long)r;
+    }
+
     if (delta > 0) {
       this.nextRefillTime = now;
-      return Math.min(limit, delta);
     }
-    return 0;
+
+    return delta;
   }
 
   @Override
@@ -47,8 +56,9 @@ public class AverageIntervalRateLimiter extends RateLimiter {
     if (nextRefillTime == -1) {
       return 0;
     }
-    long timeUnitInMillis = super.getTimeUnitInMillis();
-    return ((amount * timeUnitInMillis) / limit) - ((available * timeUnitInMillis) / limit);
+
+    double r = ((double)(amount - available)) * super.getTimeUnitInMillis() / limit;
+    return (long)r;
   }
 
   // This method is for strictly testing purpose only

http://git-wip-us.apache.org/repos/asf/hbase/blob/55e98909/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
index 654e8fa..6caac74 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
@@ -31,8 +31,7 @@ public class DefaultOperationQuota implements OperationQuota {
   private long readAvailable = 0;
   private long writeConsumed = 0;
   private long readConsumed = 0;
-
-  private AvgOperationSize avgOpSize = new AvgOperationSize();
+  private final long[] operationSize;
 
   public DefaultOperationQuota(final QuotaLimiter... limiters) {
     this(Arrays.asList(limiters));
@@ -43,6 +42,12 @@ public class DefaultOperationQuota implements OperationQuota {
    */
   public DefaultOperationQuota(final List<QuotaLimiter> limiters) {
     this.limiters = limiters;
+    int size = OperationType.values().length;
+    operationSize = new long[size];
+
+    for (int i = 0; i < size; ++i) {
+      operationSize[i] = 0;
+    }
   }
 
   @Override
@@ -68,22 +73,12 @@ public class DefaultOperationQuota implements OperationQuota {
 
   @Override
   public void close() {
-    // Calculate and set the average size of get, scan and mutate for the current operation
-    long getSize = avgOpSize.getAvgOperationSize(OperationType.GET);
-    long scanSize = avgOpSize.getAvgOperationSize(OperationType.SCAN);
-    long mutationSize = avgOpSize.getAvgOperationSize(OperationType.MUTATE);
-    for (final QuotaLimiter limiter : limiters) {
-      limiter.addOperationSize(OperationType.GET, getSize);
-      limiter.addOperationSize(OperationType.SCAN, scanSize);
-      limiter.addOperationSize(OperationType.MUTATE, mutationSize);
-    }
-
     // Adjust the quota consumed for the specified operation
-    long writeDiff = avgOpSize.getOperationSize(OperationType.MUTATE) - writeConsumed;
-    long readDiff =
-        (avgOpSize.getOperationSize(OperationType.GET) + avgOpSize
-            .getOperationSize(OperationType.SCAN)) - readConsumed;
-    for (final QuotaLimiter limiter : limiters) {
+    long writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed;
+    long readDiff = operationSize[OperationType.GET.ordinal()] +
+        operationSize[OperationType.SCAN.ordinal()] - readConsumed;
+
+    for (final QuotaLimiter limiter: limiters) {
       if (writeDiff != 0) limiter.consumeWrite(writeDiff);
       if (readDiff != 0) limiter.consumeRead(readDiff);
     }
@@ -101,33 +96,21 @@ public class DefaultOperationQuota implements OperationQuota {
 
   @Override
   public void addGetResult(final Result result) {
-    avgOpSize.addGetResult(result);
+    operationSize[OperationType.GET.ordinal()] += QuotaUtil.calculateResultSize(result);
   }
 
   @Override
   public void addScanResult(final List<Result> results) {
-    avgOpSize.addScanResult(results);
+    operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateResultSize(results);
   }
 
   @Override
   public void addMutation(final Mutation mutation) {
-    avgOpSize.addMutation(mutation);
-  }
-
-  @Override
-  public long getAvgOperationSize(OperationType type) {
-    return avgOpSize.getAvgOperationSize(type);
+    operationSize[OperationType.MUTATE.ordinal()] += QuotaUtil.calculateMutationSize(mutation);
   }
 
   private long estimateConsume(final OperationType type, int numReqs, long avgSize) {
     if (numReqs > 0) {
-      for (final QuotaLimiter limiter : limiters) {
-        long size = limiter.getAvgOperationSize(type);
-        if (size > 0) {
-          avgSize = size;
-          break;
-        }
-      }
       return avgSize * numReqs;
     }
     return 0;

http://git-wip-us.apache.org/repos/asf/hbase/blob/55e98909/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
index 2463ef7..9d37141 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
@@ -68,9 +68,4 @@ final class NoopOperationQuota implements OperationQuota {
   public long getWriteAvailable() {
     return Long.MAX_VALUE;
   }
-
-  @Override
-  public long getAvgOperationSize(OperationType type) {
-    return -1;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/55e98909/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
index 699fd1a..e72f3d2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
@@ -63,15 +63,6 @@ final class NoopQuotaLimiter implements QuotaLimiter {
   }
 
   @Override
-  public void addOperationSize(OperationType type, long size) {
-  }
-
-  @Override
-  public long getAvgOperationSize(OperationType type) {
-    return -1;
-  }
-
-  @Override
   public String toString() {
     return "NoopQuotaLimiter";
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/55e98909/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
index 6010c13..ee38256 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
@@ -29,58 +29,10 @@ public interface OperationQuota {
   }
 
   /**
-   * Keeps track of the average data size of operations like get, scan, mutate
-   */
-  public class AvgOperationSize {
-    private final long[] sizeSum;
-    private final long[] count;
-
-    public AvgOperationSize() {
-      int size = OperationType.values().length;
-      sizeSum = new long[size];
-      count = new long[size];
-      for (int i = 0; i < size; ++i) {
-        sizeSum[i] = 0;
-        count[i] = 0;
-      }
-    }
-
-    public void addOperationSize(OperationType type, long size) {
-      if (size > 0) {
-        int index = type.ordinal();
-        sizeSum[index] += size;
-        count[index]++;
-      }
-    }
-
-    public long getAvgOperationSize(OperationType type) {
-      int index = type.ordinal();
-      return count[index] > 0 ? sizeSum[index] / count[index] : 0;
-    }
-
-    public long getOperationSize(OperationType type) {
-      return sizeSum[type.ordinal()];
-    }
-
-    public void addGetResult(final Result result) {
-      long size = QuotaUtil.calculateResultSize(result);
-      addOperationSize(OperationType.GET, size);
-    }
-
-    public void addScanResult(final List<Result> results) {
-      long size = QuotaUtil.calculateResultSize(results);
-      addOperationSize(OperationType.SCAN, size);
-    }
-
-    public void addMutation(final Mutation mutation) {
-      long size = QuotaUtil.calculateMutationSize(mutation);
-      addOperationSize(OperationType.MUTATE, size);
-    }
-  }
-
-  /**
-   * Checks if it is possible to execute the specified operation. The quota will be estimated based
-   * on the number of operations to perform and the average size accumulated during time.
+   * Checks if it is possible to execute the specified operation.
+   * The quota will be estimated based on the number of operations to perform
+   * and the average size accumulated during time.
+   *
    * @param numWrites number of write operation that will be performed
    * @param numReads number of small-read operation that will be performed
    * @param numScans number of long-read operation that will be performed
@@ -114,7 +66,4 @@ public interface OperationQuota {
 
   /** @return the number of bytes available to write to avoid exceeding the quota */
   long getWriteAvailable();
-
-  /** @return the average data size of the specified operation */
-  long getAvgOperationSize(OperationType type);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/55e98909/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
index ffacbc0..a27211c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
@@ -68,13 +68,4 @@ public interface QuotaLimiter {
 
   /** @return the number of bytes available to write to avoid exceeding the quota */
   long getWriteAvailable();
-
-  /**
-   * Add the average size of the specified operation type.
-   * The average will be used as estimate for the next operations.
-   */
-  void addOperationSize(OperationType type, long size);
-
-  /** @return the average data size of the specified operation */
-  long getAvgOperationSize(OperationType type);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/55e98909/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
index 3b407bd..356afd3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
@@ -111,7 +111,15 @@ public abstract class RateLimiter {
   public synchronized void update(final RateLimiter other) {
     this.tunit = other.tunit;
     if (this.limit < other.limit) {
-      this.avail += (other.limit - this.limit);
+      // If avail is capped to this.limit, it will never overflow,
+      // otherwise, avail may overflow, just be careful here.
+      long diff = other.limit - this.limit;
+      if (this.avail <= Long.MAX_VALUE - diff) {
+        this.avail += diff;
+        this.avail = Math.min(this.avail, other.limit);
+      } else {
+        this.avail = other.limit;
+      }
     }
     this.limit = other.limit;
   }
@@ -142,10 +150,14 @@ public abstract class RateLimiter {
 
   /**
    * Are there enough available resources to allow execution?
-   * @param amount the number of required resources
+   * @param amount the number of required resources, a non-negative number
    * @return true if there are enough available resources, otherwise false
    */
   public synchronized boolean canExecute(final long amount) {
+    if (isBypass()) {
+      return true;
+    }
+
     long refillAmount = refill(limit);
     if (refillAmount == 0 && avail < amount) {
       return false;
@@ -170,13 +182,27 @@ public abstract class RateLimiter {
   }
 
   /**
-   * consume amount available units.
+   * consume amount available units, amount could be a negative number
    * @param amount the number of units to consume
    */
   public synchronized void consume(final long amount) {
-    this.avail -= amount;
-    if (this.avail < 0) {
-      this.avail = 0;
+
+    if (isBypass()) {
+      return;
+    }
+
+    if (amount >= 0 ) {
+      this.avail -= amount;
+      if (this.avail < 0) {
+        this.avail = 0;
+      }
+    } else {
+      if (this.avail <= Long.MAX_VALUE + amount) {
+        this.avail -= amount;
+        this.avail = Math.min(this.avail, this.limit);
+      } else {
+        this.avail = this.limit;
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/55e98909/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
index beb4147..1563878 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
@@ -26,8 +26,6 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Throttle;
 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota;
-import org.apache.hadoop.hbase.quotas.OperationQuota.AvgOperationSize;
-import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType;
 
 /**
  * Simple time based limiter that checks the quota Throttle
@@ -42,7 +40,6 @@ public class TimeBasedLimiter implements QuotaLimiter {
   private RateLimiter writeSizeLimiter = null;
   private RateLimiter readReqsLimiter = null;
   private RateLimiter readSizeLimiter = null;
-  private AvgOperationSize avgOpSize = new AvgOperationSize();
 
   private TimeBasedLimiter() {
     if (FixedIntervalRateLimiter.class.getName().equals(
@@ -186,16 +183,6 @@ public class TimeBasedLimiter implements QuotaLimiter {
   }
 
   @Override
-  public void addOperationSize(OperationType type, long size) {
-    avgOpSize.addOperationSize(type, size);
-  }
-
-  @Override
-  public long getAvgOperationSize(OperationType type) {
-    return avgOpSize.getAvgOperationSize(type);
-  }
-
-  @Override
   public String toString() {
     StringBuilder builder = new StringBuilder();
     builder.append("TimeBasedLimiter(");

http://git-wip-us.apache.org/repos/asf/hbase/blob/55e98909/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java
index 4c5a658..1ca6643 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java
@@ -19,6 +19,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -277,4 +278,133 @@ public class TestRateLimiter {
     limiter.setNextRefillTime(limiter.getNextRefillTime() - 5000);
     assertEquals(60, limiter.refill(limiter.getLimit()));
   }
+
+  @Test
+  public void testUnconfiguredLimiters() throws InterruptedException {
+
+    ManualEnvironmentEdge testEdge = new ManualEnvironmentEdge();
+    EnvironmentEdgeManager.injectEdge(testEdge);
+    long limit = Long.MAX_VALUE;
+
+    // For unconfigured limiters, it is supposed to use as much as possible
+    RateLimiter avgLimiter = new AverageIntervalRateLimiter();
+    RateLimiter fixLimiter = new FixedIntervalRateLimiter();
+
+    assertEquals(limit, avgLimiter.getAvailable());
+    assertEquals(limit, fixLimiter.getAvailable());
+
+    assertTrue(avgLimiter.canExecute(limit));
+    avgLimiter.consume(limit);
+
+    assertTrue(fixLimiter.canExecute(limit));
+    fixLimiter.consume(limit);
+
+    // Make sure that available is Long.MAX_VALUE
+    assertTrue(limit == avgLimiter.getAvailable());
+    assertTrue(limit == fixLimiter.getAvailable());
+
+    // after 100 millseconds, it should be able to execute limit as well
+    testEdge.incValue(100);
+
+    assertTrue(avgLimiter.canExecute(limit));
+    avgLimiter.consume(limit);
+
+    assertTrue(fixLimiter.canExecute(limit));
+    fixLimiter.consume(limit);
+
+    // Make sure that available is Long.MAX_VALUE
+    assertTrue(limit == avgLimiter.getAvailable());
+    assertTrue(limit == fixLimiter.getAvailable());
+
+    EnvironmentEdgeManager.reset();
+  }
+
+  @Test
+  public void testExtremeLimiters() throws InterruptedException {
+
+    ManualEnvironmentEdge testEdge = new ManualEnvironmentEdge();
+    EnvironmentEdgeManager.injectEdge(testEdge);
+    long limit = Long.MAX_VALUE - 1;
+
+    RateLimiter avgLimiter = new AverageIntervalRateLimiter();
+    avgLimiter.set(limit, TimeUnit.SECONDS);
+    RateLimiter fixLimiter = new FixedIntervalRateLimiter();
+    fixLimiter.set(limit, TimeUnit.SECONDS);
+
+    assertEquals(limit, avgLimiter.getAvailable());
+    assertEquals(limit, fixLimiter.getAvailable());
+
+    assertTrue(avgLimiter.canExecute(limit / 2));
+    avgLimiter.consume(limit / 2);
+
+    assertTrue(fixLimiter.canExecute(limit / 2));
+    fixLimiter.consume(limit / 2);
+
+    // Make sure that available is whatever left
+    assertTrue((limit - (limit / 2)) == avgLimiter.getAvailable());
+    assertTrue((limit - (limit / 2)) == fixLimiter.getAvailable());
+
+    // after 100 millseconds, both should not be able to execute the limit
+    testEdge.incValue(100);
+
+    assertFalse(avgLimiter.canExecute(limit));
+    assertFalse(fixLimiter.canExecute(limit));
+
+    // after 500 millseconds, average interval limiter should be able to execute the limit
+    testEdge.incValue(500);
+    assertTrue(avgLimiter.canExecute(limit));
+    assertFalse(fixLimiter.canExecute(limit));
+
+    // Make sure that available is correct
+    assertTrue(limit == avgLimiter.getAvailable());
+    assertTrue((limit - (limit / 2)) == fixLimiter.getAvailable());
+
+    // after 500 millseconds, both should be able to execute
+    testEdge.incValue(500);
+    assertTrue(avgLimiter.canExecute(limit));
+    assertTrue(fixLimiter.canExecute(limit));
+
+    // Make sure that available is Long.MAX_VALUE
+    assertTrue(limit == avgLimiter.getAvailable());
+    assertTrue(limit == fixLimiter.getAvailable());
+
+    EnvironmentEdgeManager.reset();
+  }
+
+  /*
+   * This test case is tricky. Basically, it simulates the following events:
+   *           Thread-1                             Thread-2
+   * t0:  canExecute(100) and consume(100)
+   * t1:                                         canExecute(100), avail may be increased by 80
+   * t2:  consume(-80) as actual size is 20
+   * It will check if consume(-80) can handle overflow correctly.
+   */
+  @Test
+  public void testLimiterCompensationOverflow() throws InterruptedException {
+
+    long limit = Long.MAX_VALUE - 1;
+    long guessNumber = 100;
+
+    // For unconfigured limiters, it is supposed to use as much as possible
+    RateLimiter avgLimiter = new AverageIntervalRateLimiter();
+    avgLimiter.set(limit, TimeUnit.SECONDS);
+
+    assertEquals(limit, avgLimiter.getAvailable());
+
+    // The initial guess is that 100 bytes.
+    assertTrue(avgLimiter.canExecute(guessNumber));
+    avgLimiter.consume(guessNumber);
+
+    // Make sure that available is whatever left
+    assertTrue((limit - guessNumber) == avgLimiter.getAvailable());
+
+    // Manually set avil to simulate that another thread call canExecute().
+    // It is simulated by consume().
+    avgLimiter.consume(-80);
+    assertTrue((limit - guessNumber + 80) == avgLimiter.getAvailable());
+
+    // Now thread1 compensates 80
+    avgLimiter.consume(-80);
+    assertTrue(limit == avgLimiter.getAvailable());
+  }
 }


[2/5] hbase git commit: HBASE-16699 Overflows in AverageIntervalRateLimiter's refill() and getWaitInterval()

Posted by mb...@apache.org.
HBASE-16699 Overflows in AverageIntervalRateLimiter's refill() and getWaitInterval()

Signed-off-by: Matteo Bertozzi <ma...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/66038b8c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/66038b8c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/66038b8c

Branch: refs/heads/branch-1
Commit: 66038b8c1a4ad775fdb16c7a70ceb3d86ff149bb
Parents: acb1392
Author: Huaxiang Sun <hs...@cloudera.com>
Authored: Mon Oct 10 14:12:03 2016 -0700
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Mon Oct 10 14:12:22 2016 -0700

----------------------------------------------------------------------
 .../quotas/AverageIntervalRateLimiter.java      |  20 ++-
 .../hbase/quotas/DefaultOperationQuota.java     |  47 +++----
 .../hadoop/hbase/quotas/NoopOperationQuota.java |   5 -
 .../hadoop/hbase/quotas/NoopQuotaLimiter.java   |   9 --
 .../hadoop/hbase/quotas/OperationQuota.java     |  59 +--------
 .../hadoop/hbase/quotas/QuotaLimiter.java       |   9 --
 .../apache/hadoop/hbase/quotas/RateLimiter.java |  38 +++++-
 .../hadoop/hbase/quotas/TimeBasedLimiter.java   |  13 --
 .../hadoop/hbase/quotas/TestRateLimiter.java    | 130 +++++++++++++++++++
 9 files changed, 196 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/66038b8c/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java
index 75e6aea..9320d7c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java
@@ -34,12 +34,21 @@ public class AverageIntervalRateLimiter extends RateLimiter {
       return limit;
     }
 
-    long delta = (limit * (now - nextRefillTime)) / super.getTimeUnitInMillis();
+    long timeInterval = now - nextRefillTime;
+    long delta = 0;
+    long timeUnitInMillis = super.getTimeUnitInMillis();
+    if (timeInterval >= timeUnitInMillis) {
+      delta = limit;
+    } else if (timeInterval > 0) {
+      double r = ((double)timeInterval / (double)timeUnitInMillis) * limit;
+      delta = (long)r;
+    }
+
     if (delta > 0) {
       this.nextRefillTime = now;
-      return Math.min(limit, delta);
     }
-    return 0;
+
+    return delta;
   }
 
   @Override
@@ -47,8 +56,9 @@ public class AverageIntervalRateLimiter extends RateLimiter {
     if (nextRefillTime == -1) {
       return 0;
     }
-    long timeUnitInMillis = super.getTimeUnitInMillis();
-    return ((amount * timeUnitInMillis) / limit) - ((available * timeUnitInMillis) / limit);
+
+    double r = ((double)(amount - available)) * super.getTimeUnitInMillis() / limit;
+    return (long)r;
   }
 
   // This method is for strictly testing purpose only

http://git-wip-us.apache.org/repos/asf/hbase/blob/66038b8c/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
index 654e8fa..6caac74 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
@@ -31,8 +31,7 @@ public class DefaultOperationQuota implements OperationQuota {
   private long readAvailable = 0;
   private long writeConsumed = 0;
   private long readConsumed = 0;
-
-  private AvgOperationSize avgOpSize = new AvgOperationSize();
+  private final long[] operationSize;
 
   public DefaultOperationQuota(final QuotaLimiter... limiters) {
     this(Arrays.asList(limiters));
@@ -43,6 +42,12 @@ public class DefaultOperationQuota implements OperationQuota {
    */
   public DefaultOperationQuota(final List<QuotaLimiter> limiters) {
     this.limiters = limiters;
+    int size = OperationType.values().length;
+    operationSize = new long[size];
+
+    for (int i = 0; i < size; ++i) {
+      operationSize[i] = 0;
+    }
   }
 
   @Override
@@ -68,22 +73,12 @@ public class DefaultOperationQuota implements OperationQuota {
 
   @Override
   public void close() {
-    // Calculate and set the average size of get, scan and mutate for the current operation
-    long getSize = avgOpSize.getAvgOperationSize(OperationType.GET);
-    long scanSize = avgOpSize.getAvgOperationSize(OperationType.SCAN);
-    long mutationSize = avgOpSize.getAvgOperationSize(OperationType.MUTATE);
-    for (final QuotaLimiter limiter : limiters) {
-      limiter.addOperationSize(OperationType.GET, getSize);
-      limiter.addOperationSize(OperationType.SCAN, scanSize);
-      limiter.addOperationSize(OperationType.MUTATE, mutationSize);
-    }
-
     // Adjust the quota consumed for the specified operation
-    long writeDiff = avgOpSize.getOperationSize(OperationType.MUTATE) - writeConsumed;
-    long readDiff =
-        (avgOpSize.getOperationSize(OperationType.GET) + avgOpSize
-            .getOperationSize(OperationType.SCAN)) - readConsumed;
-    for (final QuotaLimiter limiter : limiters) {
+    long writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed;
+    long readDiff = operationSize[OperationType.GET.ordinal()] +
+        operationSize[OperationType.SCAN.ordinal()] - readConsumed;
+
+    for (final QuotaLimiter limiter: limiters) {
       if (writeDiff != 0) limiter.consumeWrite(writeDiff);
       if (readDiff != 0) limiter.consumeRead(readDiff);
     }
@@ -101,33 +96,21 @@ public class DefaultOperationQuota implements OperationQuota {
 
   @Override
   public void addGetResult(final Result result) {
-    avgOpSize.addGetResult(result);
+    operationSize[OperationType.GET.ordinal()] += QuotaUtil.calculateResultSize(result);
   }
 
   @Override
   public void addScanResult(final List<Result> results) {
-    avgOpSize.addScanResult(results);
+    operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateResultSize(results);
   }
 
   @Override
   public void addMutation(final Mutation mutation) {
-    avgOpSize.addMutation(mutation);
-  }
-
-  @Override
-  public long getAvgOperationSize(OperationType type) {
-    return avgOpSize.getAvgOperationSize(type);
+    operationSize[OperationType.MUTATE.ordinal()] += QuotaUtil.calculateMutationSize(mutation);
   }
 
   private long estimateConsume(final OperationType type, int numReqs, long avgSize) {
     if (numReqs > 0) {
-      for (final QuotaLimiter limiter : limiters) {
-        long size = limiter.getAvgOperationSize(type);
-        if (size > 0) {
-          avgSize = size;
-          break;
-        }
-      }
       return avgSize * numReqs;
     }
     return 0;

http://git-wip-us.apache.org/repos/asf/hbase/blob/66038b8c/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
index 2463ef7..9d37141 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java
@@ -68,9 +68,4 @@ final class NoopOperationQuota implements OperationQuota {
   public long getWriteAvailable() {
     return Long.MAX_VALUE;
   }
-
-  @Override
-  public long getAvgOperationSize(OperationType type) {
-    return -1;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66038b8c/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
index 699fd1a..e72f3d2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
@@ -63,15 +63,6 @@ final class NoopQuotaLimiter implements QuotaLimiter {
   }
 
   @Override
-  public void addOperationSize(OperationType type, long size) {
-  }
-
-  @Override
-  public long getAvgOperationSize(OperationType type) {
-    return -1;
-  }
-
-  @Override
   public String toString() {
     return "NoopQuotaLimiter";
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66038b8c/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
index 6010c13..ee38256 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
@@ -29,58 +29,10 @@ public interface OperationQuota {
   }
 
   /**
-   * Keeps track of the average data size of operations like get, scan, mutate
-   */
-  public class AvgOperationSize {
-    private final long[] sizeSum;
-    private final long[] count;
-
-    public AvgOperationSize() {
-      int size = OperationType.values().length;
-      sizeSum = new long[size];
-      count = new long[size];
-      for (int i = 0; i < size; ++i) {
-        sizeSum[i] = 0;
-        count[i] = 0;
-      }
-    }
-
-    public void addOperationSize(OperationType type, long size) {
-      if (size > 0) {
-        int index = type.ordinal();
-        sizeSum[index] += size;
-        count[index]++;
-      }
-    }
-
-    public long getAvgOperationSize(OperationType type) {
-      int index = type.ordinal();
-      return count[index] > 0 ? sizeSum[index] / count[index] : 0;
-    }
-
-    public long getOperationSize(OperationType type) {
-      return sizeSum[type.ordinal()];
-    }
-
-    public void addGetResult(final Result result) {
-      long size = QuotaUtil.calculateResultSize(result);
-      addOperationSize(OperationType.GET, size);
-    }
-
-    public void addScanResult(final List<Result> results) {
-      long size = QuotaUtil.calculateResultSize(results);
-      addOperationSize(OperationType.SCAN, size);
-    }
-
-    public void addMutation(final Mutation mutation) {
-      long size = QuotaUtil.calculateMutationSize(mutation);
-      addOperationSize(OperationType.MUTATE, size);
-    }
-  }
-
-  /**
-   * Checks if it is possible to execute the specified operation. The quota will be estimated based
-   * on the number of operations to perform and the average size accumulated during time.
+   * Checks if it is possible to execute the specified operation.
+   * The quota will be estimated based on the number of operations to perform
+   * and the average size accumulated during time.
+   *
    * @param numWrites number of write operation that will be performed
    * @param numReads number of small-read operation that will be performed
    * @param numScans number of long-read operation that will be performed
@@ -114,7 +66,4 @@ public interface OperationQuota {
 
   /** @return the number of bytes available to write to avoid exceeding the quota */
   long getWriteAvailable();
-
-  /** @return the average data size of the specified operation */
-  long getAvgOperationSize(OperationType type);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66038b8c/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
index ffacbc0..a27211c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
@@ -68,13 +68,4 @@ public interface QuotaLimiter {
 
   /** @return the number of bytes available to write to avoid exceeding the quota */
   long getWriteAvailable();
-
-  /**
-   * Add the average size of the specified operation type.
-   * The average will be used as estimate for the next operations.
-   */
-  void addOperationSize(OperationType type, long size);
-
-  /** @return the average data size of the specified operation */
-  long getAvgOperationSize(OperationType type);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/66038b8c/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
index 3b407bd..356afd3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
@@ -111,7 +111,15 @@ public abstract class RateLimiter {
   public synchronized void update(final RateLimiter other) {
     this.tunit = other.tunit;
     if (this.limit < other.limit) {
-      this.avail += (other.limit - this.limit);
+      // If avail is capped to this.limit, it will never overflow,
+      // otherwise, avail may overflow, just be careful here.
+      long diff = other.limit - this.limit;
+      if (this.avail <= Long.MAX_VALUE - diff) {
+        this.avail += diff;
+        this.avail = Math.min(this.avail, other.limit);
+      } else {
+        this.avail = other.limit;
+      }
     }
     this.limit = other.limit;
   }
@@ -142,10 +150,14 @@ public abstract class RateLimiter {
 
   /**
    * Are there enough available resources to allow execution?
-   * @param amount the number of required resources
+   * @param amount the number of required resources, a non-negative number
    * @return true if there are enough available resources, otherwise false
    */
   public synchronized boolean canExecute(final long amount) {
+    if (isBypass()) {
+      return true;
+    }
+
     long refillAmount = refill(limit);
     if (refillAmount == 0 && avail < amount) {
       return false;
@@ -170,13 +182,27 @@ public abstract class RateLimiter {
   }
 
   /**
-   * consume amount available units.
+   * consume amount available units, amount could be a negative number
    * @param amount the number of units to consume
    */
   public synchronized void consume(final long amount) {
-    this.avail -= amount;
-    if (this.avail < 0) {
-      this.avail = 0;
+
+    if (isBypass()) {
+      return;
+    }
+
+    if (amount >= 0 ) {
+      this.avail -= amount;
+      if (this.avail < 0) {
+        this.avail = 0;
+      }
+    } else {
+      if (this.avail <= Long.MAX_VALUE + amount) {
+        this.avail -= amount;
+        this.avail = Math.min(this.avail, this.limit);
+      } else {
+        this.avail = this.limit;
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/66038b8c/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
index beb4147..1563878 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
@@ -26,8 +26,6 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Throttle;
 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota;
-import org.apache.hadoop.hbase.quotas.OperationQuota.AvgOperationSize;
-import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType;
 
 /**
  * Simple time based limiter that checks the quota Throttle
@@ -42,7 +40,6 @@ public class TimeBasedLimiter implements QuotaLimiter {
   private RateLimiter writeSizeLimiter = null;
   private RateLimiter readReqsLimiter = null;
   private RateLimiter readSizeLimiter = null;
-  private AvgOperationSize avgOpSize = new AvgOperationSize();
 
   private TimeBasedLimiter() {
     if (FixedIntervalRateLimiter.class.getName().equals(
@@ -186,16 +183,6 @@ public class TimeBasedLimiter implements QuotaLimiter {
   }
 
   @Override
-  public void addOperationSize(OperationType type, long size) {
-    avgOpSize.addOperationSize(type, size);
-  }
-
-  @Override
-  public long getAvgOperationSize(OperationType type) {
-    return avgOpSize.getAvgOperationSize(type);
-  }
-
-  @Override
   public String toString() {
     StringBuilder builder = new StringBuilder();
     builder.append("TimeBasedLimiter(");

http://git-wip-us.apache.org/repos/asf/hbase/blob/66038b8c/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java
index 4c5a658..1ca6643 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java
@@ -19,6 +19,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -277,4 +278,133 @@ public class TestRateLimiter {
     limiter.setNextRefillTime(limiter.getNextRefillTime() - 5000);
     assertEquals(60, limiter.refill(limiter.getLimit()));
   }
+
+  @Test
+  public void testUnconfiguredLimiters() throws InterruptedException {
+
+    ManualEnvironmentEdge testEdge = new ManualEnvironmentEdge();
+    EnvironmentEdgeManager.injectEdge(testEdge);
+    long limit = Long.MAX_VALUE;
+
+    // For unconfigured limiters, it is supposed to use as much as possible
+    RateLimiter avgLimiter = new AverageIntervalRateLimiter();
+    RateLimiter fixLimiter = new FixedIntervalRateLimiter();
+
+    assertEquals(limit, avgLimiter.getAvailable());
+    assertEquals(limit, fixLimiter.getAvailable());
+
+    assertTrue(avgLimiter.canExecute(limit));
+    avgLimiter.consume(limit);
+
+    assertTrue(fixLimiter.canExecute(limit));
+    fixLimiter.consume(limit);
+
+    // Make sure that available is Long.MAX_VALUE
+    assertTrue(limit == avgLimiter.getAvailable());
+    assertTrue(limit == fixLimiter.getAvailable());
+
+    // after 100 millseconds, it should be able to execute limit as well
+    testEdge.incValue(100);
+
+    assertTrue(avgLimiter.canExecute(limit));
+    avgLimiter.consume(limit);
+
+    assertTrue(fixLimiter.canExecute(limit));
+    fixLimiter.consume(limit);
+
+    // Make sure that available is Long.MAX_VALUE
+    assertTrue(limit == avgLimiter.getAvailable());
+    assertTrue(limit == fixLimiter.getAvailable());
+
+    EnvironmentEdgeManager.reset();
+  }
+
+  @Test
+  public void testExtremeLimiters() throws InterruptedException {
+
+    ManualEnvironmentEdge testEdge = new ManualEnvironmentEdge();
+    EnvironmentEdgeManager.injectEdge(testEdge);
+    long limit = Long.MAX_VALUE - 1;
+
+    RateLimiter avgLimiter = new AverageIntervalRateLimiter();
+    avgLimiter.set(limit, TimeUnit.SECONDS);
+    RateLimiter fixLimiter = new FixedIntervalRateLimiter();
+    fixLimiter.set(limit, TimeUnit.SECONDS);
+
+    assertEquals(limit, avgLimiter.getAvailable());
+    assertEquals(limit, fixLimiter.getAvailable());
+
+    assertTrue(avgLimiter.canExecute(limit / 2));
+    avgLimiter.consume(limit / 2);
+
+    assertTrue(fixLimiter.canExecute(limit / 2));
+    fixLimiter.consume(limit / 2);
+
+    // Make sure that available is whatever left
+    assertTrue((limit - (limit / 2)) == avgLimiter.getAvailable());
+    assertTrue((limit - (limit / 2)) == fixLimiter.getAvailable());
+
+    // after 100 millseconds, both should not be able to execute the limit
+    testEdge.incValue(100);
+
+    assertFalse(avgLimiter.canExecute(limit));
+    assertFalse(fixLimiter.canExecute(limit));
+
+    // after 500 millseconds, average interval limiter should be able to execute the limit
+    testEdge.incValue(500);
+    assertTrue(avgLimiter.canExecute(limit));
+    assertFalse(fixLimiter.canExecute(limit));
+
+    // Make sure that available is correct
+    assertTrue(limit == avgLimiter.getAvailable());
+    assertTrue((limit - (limit / 2)) == fixLimiter.getAvailable());
+
+    // after 500 millseconds, both should be able to execute
+    testEdge.incValue(500);
+    assertTrue(avgLimiter.canExecute(limit));
+    assertTrue(fixLimiter.canExecute(limit));
+
+    // Make sure that available is Long.MAX_VALUE
+    assertTrue(limit == avgLimiter.getAvailable());
+    assertTrue(limit == fixLimiter.getAvailable());
+
+    EnvironmentEdgeManager.reset();
+  }
+
+  /*
+   * This test case is tricky. Basically, it simulates the following events:
+   *           Thread-1                             Thread-2
+   * t0:  canExecute(100) and consume(100)
+   * t1:                                         canExecute(100), avail may be increased by 80
+   * t2:  consume(-80) as actual size is 20
+   * It will check if consume(-80) can handle overflow correctly.
+   */
+  @Test
+  public void testLimiterCompensationOverflow() throws InterruptedException {
+
+    long limit = Long.MAX_VALUE - 1;
+    long guessNumber = 100;
+
+    // For unconfigured limiters, it is supposed to use as much as possible
+    RateLimiter avgLimiter = new AverageIntervalRateLimiter();
+    avgLimiter.set(limit, TimeUnit.SECONDS);
+
+    assertEquals(limit, avgLimiter.getAvailable());
+
+    // The initial guess is that 100 bytes.
+    assertTrue(avgLimiter.canExecute(guessNumber));
+    avgLimiter.consume(guessNumber);
+
+    // Make sure that available is whatever left
+    assertTrue((limit - guessNumber) == avgLimiter.getAvailable());
+
+    // Manually set avil to simulate that another thread call canExecute().
+    // It is simulated by consume().
+    avgLimiter.consume(-80);
+    assertTrue((limit - guessNumber + 80) == avgLimiter.getAvailable());
+
+    // Now thread1 compensates 80
+    avgLimiter.consume(-80);
+    assertTrue(limit == avgLimiter.getAvailable());
+  }
 }