You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2020/08/25 21:49:04 UTC

[incubator-pinot] branch add-max-qps-bucket-count created (now b66b420)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a change to branch add-max-qps-bucket-count
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at b66b420  Add max qps bucket count

This branch includes the following new commits:

     new b66b420  Add max qps bucket count

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Add max qps bucket count

Posted by jl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch add-max-qps-bucket-count
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit b66b420a686ed7c61a48824ea6148ba9e3e50ea3
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Tue Aug 25 14:48:34 2020 -0700

    Add max qps bucket count
---
 .../HelixExternalViewBasedQueryQuotaManager.java   | 15 ++++++---
 .../apache/pinot/broker/queryquota/HitCounter.java | 39 ++++++++++++++++++----
 .../pinot/broker/queryquota/QueryQuotaEntity.java  | 18 ++++++----
 .../pinot/broker/queryquota/HitCounterTest.java    | 35 +++++++++++++++++++
 .../pinot/common/metrics/AbstractMetrics.java      | 25 ++++++++++++++
 .../apache/pinot/common/metrics/BrokerGauge.java   |  1 +
 6 files changed, 115 insertions(+), 18 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
index 6877d11..aeecf47 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
@@ -55,7 +55,8 @@ import org.slf4j.LoggerFactory;
  */
 public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHandler, QueryQuotaManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(HelixExternalViewBasedQueryQuotaManager.class);
-  private static final int TIME_RANGE_IN_SECOND = 1;
+  private static final int ONE_SECOND_TIME_RANGE_IN_SECOND = 1;
+  private static final int ONE_MINUTE_TIME_RANGE_IN_SECOND = 60;
 
   private final BrokerMetrics _brokerMetrics;
   private final String _instanceId;
@@ -196,8 +197,8 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan
 
     double perBrokerRate = overallRate / onlineCount;
     QueryQuotaEntity queryQuotaEntity =
-        new QueryQuotaEntity(RateLimiter.create(perBrokerRate), new HitCounter(TIME_RANGE_IN_SECOND), onlineCount,
-            overallRate, stat.getVersion());
+        new QueryQuotaEntity(RateLimiter.create(perBrokerRate), new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND),
+            new HitCounter(ONE_MINUTE_TIME_RANGE_IN_SECOND, 60), onlineCount, overallRate, stat.getVersion());
     _rateLimiterMap.put(tableNameWithType, queryQuotaEntity);
     LOGGER.info(
         "Rate limiter for table: {} has been initialized. Overall rate: {}. Per-broker rate: {}. Number of online broker instances: {}. Table config stat version: {}",
@@ -255,18 +256,22 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan
    */
   private boolean tryAcquireToken(String tableNameWithType, QueryQuotaEntity queryQuotaEntity) {
     // Use hit counter to count the number of hits.
-    queryQuotaEntity.getHitCounter().hit();
+    queryQuotaEntity.getHitCounterInSecond().hit();
+    queryQuotaEntity.getHitCounterInMinute().hit();
 
     RateLimiter rateLimiter = queryQuotaEntity.getRateLimiter();
     double perBrokerRate = rateLimiter.getRate();
 
     // Emit the qps capacity utilization rate.
-    int numHits = queryQuotaEntity.getHitCounter().getHitCount();
+    int numHits = queryQuotaEntity.getHitCounterInSecond().getHitCount();
     if (_brokerMetrics != null) {
       int percentageOfCapacityUtilization = (int) (numHits * 100 / perBrokerRate);
       LOGGER.debug("The percentage of rate limit capacity utilization is {}", percentageOfCapacityUtilization);
       _brokerMetrics.setValueOfTableGauge(tableNameWithType, BrokerGauge.QUERY_QUOTA_CAPACITY_UTILIZATION_RATE,
           percentageOfCapacityUtilization);
+
+      _brokerMetrics.addCallbackTableGaugeIfNeeded(tableNameWithType, BrokerGauge.MAX_QPS_IN_ONE_MINUTE,
+          () -> (long) queryQuotaEntity.getHitCounterInMinute().getMaxCountPerBucket());
     }
 
     if (!rateLimiter.tryAcquire()) {
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HitCounter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HitCounter.java
index c5d5b6e..ee1d311 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HitCounter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HitCounter.java
@@ -30,15 +30,21 @@ import java.util.concurrent.atomic.AtomicLongArray;
  * of hits within the last 100 time buckets.
  */
 public class HitCounter {
-  private static int BUCKET_COUNT = 100;
+  private static int DEFAULT_BUCKET_COUNT = 100;
   private final int _timeBucketWidthMs;
+  private final int _bucketCount;
   private final AtomicLongArray _bucketStartTime;
   private final AtomicIntegerArray _bucketHitCount;
 
   public HitCounter(int timeRangeInSeconds) {
-    _timeBucketWidthMs = timeRangeInSeconds * 1000 / BUCKET_COUNT;
-    _bucketStartTime = new AtomicLongArray(BUCKET_COUNT);
-    _bucketHitCount = new AtomicIntegerArray(BUCKET_COUNT);
+    this(timeRangeInSeconds, DEFAULT_BUCKET_COUNT);
+  }
+
+  public HitCounter(int timeRangeInSeconds, int bucketCount) {
+    _bucketCount = bucketCount;
+    _timeBucketWidthMs = timeRangeInSeconds * 1000 / _bucketCount;
+    _bucketStartTime = new AtomicLongArray(_bucketCount);
+    _bucketHitCount = new AtomicIntegerArray(_bucketCount);
   }
 
   /**
@@ -51,7 +57,7 @@ public class HitCounter {
   @VisibleForTesting
   void hit(long timestamp) {
     long numTimeUnits = timestamp / _timeBucketWidthMs;
-    int index = (int) (numTimeUnits % BUCKET_COUNT);
+    int index = (int) (numTimeUnits % _bucketCount);
     if (_bucketStartTime.get(index) == numTimeUnits) {
       _bucketHitCount.incrementAndGet(index);
     } else {
@@ -77,11 +83,30 @@ public class HitCounter {
   int getHitCount(long timestamp) {
     long numTimeUnits = timestamp / _timeBucketWidthMs;
     int count = 0;
-    for (int i = 0; i < BUCKET_COUNT; i++) {
-      if (numTimeUnits - _bucketStartTime.get(i) < BUCKET_COUNT) {
+    for (int i = 0; i < _bucketCount; i++) {
+      if (numTimeUnits - _bucketStartTime.get(i) < _bucketCount) {
         count += _bucketHitCount.get(i);
       }
     }
     return count;
   }
+
+  /**
+   * Get the maximum count among the buckets
+   */
+  public int getMaxCountPerBucket() {
+    return getMaxCountPerBucket(System.currentTimeMillis());
+  }
+
+  @VisibleForTesting
+  int getMaxCountPerBucket(long timestamp) {
+    long numTimeUnits = timestamp / _timeBucketWidthMs;
+    int maxCount = 0;
+    for (int i = 0; i < _bucketCount; i++) {
+      if (numTimeUnits - _bucketStartTime.get(i) < _bucketCount) {
+        maxCount = Math.max(_bucketHitCount.get(i), maxCount);
+      }
+    }
+    return maxCount;
+  }
 }
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaEntity.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaEntity.java
index 40a2c96..6f9927f 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaEntity.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaEntity.java
@@ -24,15 +24,17 @@ import com.google.common.util.concurrent.RateLimiter;
 public class QueryQuotaEntity {
 
   private RateLimiter _rateLimiter;
-  private HitCounter _hitCounter;
+  private HitCounter _hitCounterInSecond;
+  private HitCounter _hitCounterInMinute;
   private int _numOnlineBrokers;
   private double _overallRate;
   private int _tableConfigStatVersion;
 
-  public QueryQuotaEntity(RateLimiter rateLimiter, HitCounter hitCounter, int numOnlineBrokers, double overallRate,
-      int tableConfigStatVersion) {
+  public QueryQuotaEntity(RateLimiter rateLimiter, HitCounter hitCounterInSecond, HitCounter hitCounterInMinute,
+      int numOnlineBrokers, double overallRate, int tableConfigStatVersion) {
     _rateLimiter = rateLimiter;
-    _hitCounter = hitCounter;
+    _hitCounterInSecond = hitCounterInSecond;
+    _hitCounterInMinute = hitCounterInMinute;
     _numOnlineBrokers = numOnlineBrokers;
     _overallRate = overallRate;
     _tableConfigStatVersion = tableConfigStatVersion;
@@ -42,8 +44,12 @@ public class QueryQuotaEntity {
     return _rateLimiter;
   }
 
-  public HitCounter getHitCounter() {
-    return _hitCounter;
+  public HitCounter getHitCounterInSecond() {
+    return _hitCounterInSecond;
+  }
+
+  public HitCounter getHitCounterInMinute() {
+    return _hitCounterInMinute;
   }
 
   public int getNumOnlineBrokers() {
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HitCounterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HitCounterTest.java
index 635838b..4f10cb1 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HitCounterTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HitCounterTest.java
@@ -114,4 +114,39 @@ public class HitCounterTest {
       System.out.println(duration);
     }
   }
+
+  @Test
+  public void testGetMaxCountPerBucket() {
+    int timeInSec = 60;
+    int bucketCount = 60;
+    HitCounter hitCounter = new HitCounter(timeInSec, bucketCount);
+    long currentTimestamp = System.currentTimeMillis();
+    for (int i = 0; i < timeInSec; i++) {
+      for (int j = 0; j < 5; j++) {
+        hitCounter.hit(currentTimestamp + i * 1000);
+      }
+    }
+    long latestTimeStamp = currentTimestamp + (timeInSec - 1) * 1000;
+    Assert.assertNotNull(hitCounter);
+    Assert.assertEquals(5, hitCounter.getMaxCountPerBucket(latestTimeStamp));
+
+    // Increase hit count by 1
+    Random random = new Random();
+    hitCounter.hit(currentTimestamp + random.nextInt(60) * 1000);
+    Assert.assertEquals(6, hitCounter.getMaxCountPerBucket(latestTimeStamp));
+
+
+    hitCounter = new HitCounter(timeInSec, bucketCount);
+    // Set the iteration larger than the time range
+    int newIteration = timeInSec + 10;
+    for (int i = 0; i < newIteration; i++) {
+      for (int j = 0; j < 5; j++) {
+        hitCounter.hit(currentTimestamp + i * 1000);
+      }
+    }
+    latestTimeStamp = currentTimestamp + (newIteration - 1) * 1000;
+    // The max count is still the same
+    Assert.assertNotNull(hitCounter);
+    Assert.assertEquals(5, hitCounter.getMaxCountPerBucket(latestTimeStamp));
+  }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
index cf40f69..1b76ed1 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
@@ -393,6 +393,31 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e
     }
   }
 
+  public void addCallbackTableGaugeIfNeeded(final String tableName, final G gauge, final Callable<Long> valueCallback) {
+    final String fullGaugeName;
+    String gaugeName = gauge.getGaugeName();
+    fullGaugeName = gaugeName + "." + getTableName(tableName);
+
+    addCallbackGaugeIfNeeded(fullGaugeName, valueCallback);
+  }
+
+  /**
+   * Similar to addCallbackGauge method.
+   * This method may be called multiple times, while it will be registered to callback function only once.
+   * @param metricName The name of the metric
+   * @param valueCallback The callback function used to retrieve the value of the gauge
+   */
+  public void addCallbackGaugeIfNeeded(final String metricName, final Callable<Long> valueCallback) {
+    if (!_gaugeValues.containsKey(metricName)) {
+      synchronized (_gaugeValues) {
+        if (!_gaugeValues.containsKey(metricName)) {
+          _gaugeValues.put(metricName, new AtomicLong(0L));
+          addCallbackGauge(metricName, valueCallback);
+        }
+      }
+    }
+  }
+
   /**
    * Adds a new gauge whose values are retrieved from a callback function.
    *
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
index c819382..d8e5c28 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
@@ -27,6 +27,7 @@ import org.apache.pinot.common.Utils;
  */
 public enum BrokerGauge implements AbstractMetrics.Gauge {
   QUERY_QUOTA_CAPACITY_UTILIZATION_RATE("tables", false),
+  MAX_QPS_IN_ONE_MINUTE("tables", false),
   QUERY_RATE_LIMIT_DISABLED("queryQuota", true),
   NETTY_CONNECTION_CONNECT_TIME_MS("nettyConnection", true);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org