You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2020/08/28 07:02:28 UTC

[incubator-pinot] branch master updated: Add max qps bucket count (#5922)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b78dcc  Add max qps bucket count (#5922)
6b78dcc is described below

commit 6b78dcc465bc65c394f414f647493cb5d15ad15b
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Fri Aug 28 00:02:10 2020 -0700

    Add max qps bucket count (#5922)
    
    * Add max qps bucket count
    
    * Introduce StatefulHitCounter
    
    Co-authored-by: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
---
 .../HelixExternalViewBasedQueryQuotaManager.java   | 15 ++--
 .../apache/pinot/broker/queryquota/HitCounter.java | 26 ++++---
 .../pinot/broker/queryquota/MaxHitRateTracker.java | 81 ++++++++++++++++++++++
 .../pinot/broker/queryquota/QueryQuotaEntity.java  | 18 +++--
 .../broker/queryquota/MaxHitRateTrackerTest.java   | 59 ++++++++++++++++
 .../pinot/common/metrics/AbstractMetrics.java      | 25 +++++++
 .../apache/pinot/common/metrics/BrokerGauge.java   |  1 +
 7 files changed, 204 insertions(+), 21 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
index 6877d11..a33815d 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
@@ -55,7 +55,8 @@ import org.slf4j.LoggerFactory;
  */
 public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHandler, QueryQuotaManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(HelixExternalViewBasedQueryQuotaManager.class);
-  private static final int TIME_RANGE_IN_SECOND = 1;
+  private static final int ONE_SECOND_TIME_RANGE_IN_SECOND = 1;
+  private static final int ONE_MINUTE_TIME_RANGE_IN_SECOND = 60;
 
   private final BrokerMetrics _brokerMetrics;
   private final String _instanceId;
@@ -196,8 +197,8 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan
 
     double perBrokerRate = overallRate / onlineCount;
     QueryQuotaEntity queryQuotaEntity =
-        new QueryQuotaEntity(RateLimiter.create(perBrokerRate), new HitCounter(TIME_RANGE_IN_SECOND), onlineCount,
-            overallRate, stat.getVersion());
+        new QueryQuotaEntity(RateLimiter.create(perBrokerRate), new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND),
+            new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), onlineCount, overallRate, stat.getVersion());
     _rateLimiterMap.put(tableNameWithType, queryQuotaEntity);
     LOGGER.info(
         "Rate limiter for table: {} has been initialized. Overall rate: {}. Per-broker rate: {}. Number of online broker instances: {}. Table config stat version: {}",
@@ -255,18 +256,22 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan
    */
   private boolean tryAcquireToken(String tableNameWithType, QueryQuotaEntity queryQuotaEntity) {
     // Use hit counter to count the number of hits.
-    queryQuotaEntity.getHitCounter().hit();
+    queryQuotaEntity.getQpsTracker().hit();
+    queryQuotaEntity.getMaxQpsTracker().hit();
 
     RateLimiter rateLimiter = queryQuotaEntity.getRateLimiter();
     double perBrokerRate = rateLimiter.getRate();
 
     // Emit the qps capacity utilization rate.
-    int numHits = queryQuotaEntity.getHitCounter().getHitCount();
+    int numHits = queryQuotaEntity.getQpsTracker().getHitCount();
     if (_brokerMetrics != null) {
       int percentageOfCapacityUtilization = (int) (numHits * 100 / perBrokerRate);
       LOGGER.debug("The percentage of rate limit capacity utilization is {}", percentageOfCapacityUtilization);
       _brokerMetrics.setValueOfTableGauge(tableNameWithType, BrokerGauge.QUERY_QUOTA_CAPACITY_UTILIZATION_RATE,
           percentageOfCapacityUtilization);
+
+      _brokerMetrics.addCallbackTableGaugeIfNeeded(tableNameWithType, BrokerGauge.MAX_BURST_QPS,
+          () -> (long) queryQuotaEntity.getMaxQpsTracker().getMaxCountPerBucket());
     }
 
     if (!rateLimiter.tryAcquire()) {
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HitCounter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HitCounter.java
index c5d5b6e..3d4d8d4 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HitCounter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HitCounter.java
@@ -30,15 +30,21 @@ import java.util.concurrent.atomic.AtomicLongArray;
  * of hits within the last 100 time buckets.
  */
 public class HitCounter {
-  private static int BUCKET_COUNT = 100;
-  private final int _timeBucketWidthMs;
-  private final AtomicLongArray _bucketStartTime;
-  private final AtomicIntegerArray _bucketHitCount;
+  private static int DEFAULT_BUCKET_COUNT = 100;
+  final int _timeBucketWidthMs;
+  final int _bucketCount;
+  final AtomicLongArray _bucketStartTime;
+  final AtomicIntegerArray _bucketHitCount;
 
   public HitCounter(int timeRangeInSeconds) {
-    _timeBucketWidthMs = timeRangeInSeconds * 1000 / BUCKET_COUNT;
-    _bucketStartTime = new AtomicLongArray(BUCKET_COUNT);
-    _bucketHitCount = new AtomicIntegerArray(BUCKET_COUNT);
+    this(timeRangeInSeconds, DEFAULT_BUCKET_COUNT);
+  }
+
+  public HitCounter(int timeRangeInSeconds, int bucketCount) {
+    _bucketCount = bucketCount;
+    _timeBucketWidthMs = timeRangeInSeconds * 1000 / _bucketCount;
+    _bucketStartTime = new AtomicLongArray(_bucketCount);
+    _bucketHitCount = new AtomicIntegerArray(_bucketCount);
   }
 
   /**
@@ -51,7 +57,7 @@ public class HitCounter {
   @VisibleForTesting
   void hit(long timestamp) {
     long numTimeUnits = timestamp / _timeBucketWidthMs;
-    int index = (int) (numTimeUnits % BUCKET_COUNT);
+    int index = (int) (numTimeUnits % _bucketCount);
     if (_bucketStartTime.get(index) == numTimeUnits) {
       _bucketHitCount.incrementAndGet(index);
     } else {
@@ -77,8 +83,8 @@ public class HitCounter {
   int getHitCount(long timestamp) {
     long numTimeUnits = timestamp / _timeBucketWidthMs;
     int count = 0;
-    for (int i = 0; i < BUCKET_COUNT; i++) {
-      if (numTimeUnits - _bucketStartTime.get(i) < BUCKET_COUNT) {
+    for (int i = 0; i < _bucketCount; i++) {
+      if (numTimeUnits - _bucketStartTime.get(i) < _bucketCount) {
         count += _bucketHitCount.get(i);
       }
     }
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/MaxHitRateTracker.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/MaxHitRateTracker.java
new file mode 100644
index 0000000..25c6767
--- /dev/null
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/MaxHitRateTracker.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.queryquota;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A stateful version of hit counter. Similar to the default hit counter, it maintains a list of buckets.
+ * Whereas it maintains an extra variable called _lastAccessTimestamp which tracks the last access time.
+ * If the stateful hit counter gets queried, it firstly compares the current timestamp and the last access timestamp,
+ * calculating the start index and end index among the buckets. Then, it traverses through all the valid candidate buckets.
+ * If the current timestamp has exceeded the current time range of all the buckets, this hit counter will use
+ * the current timestamp minus the default time queried time range to calculate the start time index.
+ */
+public class MaxHitRateTracker extends HitCounter {
+  private static int ONE_SECOND_BUCKET_WIDTH_MS = 1000;
+  private static int MAX_TIME_RANGE_FACTOR = 2;
+
+  private final long _maxTimeRangeMs;
+  private final long _defaultTimeRangeMs;
+  private volatile long _lastAccessTimestamp;
+
+  public MaxHitRateTracker(int timeRangeInSeconds) {
+    this(timeRangeInSeconds, timeRangeInSeconds * MAX_TIME_RANGE_FACTOR);
+  }
+
+  private MaxHitRateTracker(int defaultTimeRangeInSeconds, int maxTimeRangeInSeconds) {
+    super(maxTimeRangeInSeconds, (int) (maxTimeRangeInSeconds * 1000L / ONE_SECOND_BUCKET_WIDTH_MS));
+    _defaultTimeRangeMs = defaultTimeRangeInSeconds * 1000L;
+    _maxTimeRangeMs = maxTimeRangeInSeconds * 1000L;
+  }
+
+  /**
+   * Get the maximum count among the buckets
+   */
+  public int getMaxCountPerBucket() {
+    return getMaxCountPerBucket(System.currentTimeMillis());
+  }
+
+  @VisibleForTesting
+  int getMaxCountPerBucket(long now) {
+    // Update the last access timestamp if the hit counter didn't get queried for more than _maxTimeRangeMs.
+    long then = _lastAccessTimestamp;
+    if (now - then > _maxTimeRangeMs) {
+      then = now - _defaultTimeRangeMs;
+    }
+    long startTimeUnits = then / _timeBucketWidthMs;
+    int startIndex = (int) (startTimeUnits % _bucketCount);
+
+    long numTimeUnits = now / _timeBucketWidthMs;
+    int endIndex = (int) (numTimeUnits % _bucketCount);
+
+    int maxCount = 0;
+    // Skipping the end index here as its bucket hasn't fully gathered all the hits yet.
+    for (int i = startIndex; i != endIndex; i = (++i % _bucketCount)) {
+      if (numTimeUnits - _bucketStartTime.get(i) < _bucketCount) {
+        maxCount = Math.max(_bucketHitCount.get(i), maxCount);
+      }
+    }
+
+    // Update the last access timestamp
+    _lastAccessTimestamp = now;
+    return maxCount;
+  }
+}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaEntity.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaEntity.java
index 40a2c96..9c243ae 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaEntity.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaEntity.java
@@ -24,15 +24,17 @@ import com.google.common.util.concurrent.RateLimiter;
 public class QueryQuotaEntity {
 
   private RateLimiter _rateLimiter;
-  private HitCounter _hitCounter;
+  private HitCounter _qpsTracker;
+  private MaxHitRateTracker _maxQpsTracker;
   private int _numOnlineBrokers;
   private double _overallRate;
   private int _tableConfigStatVersion;
 
-  public QueryQuotaEntity(RateLimiter rateLimiter, HitCounter hitCounter, int numOnlineBrokers, double overallRate,
-      int tableConfigStatVersion) {
+  public QueryQuotaEntity(RateLimiter rateLimiter, HitCounter qpsTracker, MaxHitRateTracker maxQpsTracker,
+      int numOnlineBrokers, double overallRate, int tableConfigStatVersion) {
     _rateLimiter = rateLimiter;
-    _hitCounter = hitCounter;
+    _qpsTracker = qpsTracker;
+    _maxQpsTracker = maxQpsTracker;
     _numOnlineBrokers = numOnlineBrokers;
     _overallRate = overallRate;
     _tableConfigStatVersion = tableConfigStatVersion;
@@ -42,8 +44,12 @@ public class QueryQuotaEntity {
     return _rateLimiter;
   }
 
-  public HitCounter getHitCounter() {
-    return _hitCounter;
+  public HitCounter getQpsTracker() {
+    return _qpsTracker;
+  }
+
+  public MaxHitRateTracker getMaxQpsTracker() {
+    return _maxQpsTracker;
   }
 
   public int getNumOnlineBrokers() {
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/MaxHitRateTrackerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/MaxHitRateTrackerTest.java
new file mode 100644
index 0000000..26ccb1f
--- /dev/null
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/MaxHitRateTrackerTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.queryquota;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class MaxHitRateTrackerTest {
+
+  @Test
+  public void testMaxHitRateTracker() {
+    int timeInSec = 60;
+    MaxHitRateTracker hitCounter = new MaxHitRateTracker(timeInSec);
+    long currentTimestamp = System.currentTimeMillis();
+    for (int i = 0; i < timeInSec; i++) {
+      for (int j = 0; j < 5; j++) {
+        hitCounter.hit(currentTimestamp + i * 1000);
+      }
+    }
+    long latestTimeStamp = currentTimestamp + (timeInSec - 1) * 1000;
+    Assert.assertNotNull(hitCounter);
+    Assert.assertEquals(5, hitCounter.getMaxCountPerBucket(latestTimeStamp));
+
+    // 2 seconds have passed, the hit counter should return 5 as well since the count in the last bucket could increase.
+    latestTimeStamp = latestTimeStamp + 2000L;
+    Assert.assertEquals(5, hitCounter.getMaxCountPerBucket(latestTimeStamp));
+
+    // This time it should return 0 as the internal lastAccessTimestamp has already been updated and there is no more hits between the gap.
+    latestTimeStamp = latestTimeStamp + 2000L;
+    Assert.assertEquals(0, hitCounter.getMaxCountPerBucket(latestTimeStamp));
+
+    // Increment the hit in this second and we should see the result becomes 1.
+    hitCounter.hit(latestTimeStamp);
+    latestTimeStamp = latestTimeStamp + 2000L;
+    Assert.assertEquals(1, hitCounter.getMaxCountPerBucket(latestTimeStamp));
+
+    // More than a time range period has passed and the hit counter should return 0 as there is no hits.
+    hitCounter.hit(latestTimeStamp);
+    latestTimeStamp = latestTimeStamp + timeInSec * 2 * 1000L + 2000L;
+    Assert.assertEquals(0, hitCounter.getMaxCountPerBucket(latestTimeStamp));
+  }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
index cf40f69..1b76ed1 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
@@ -393,6 +393,31 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e
     }
   }
 
+  public void addCallbackTableGaugeIfNeeded(final String tableName, final G gauge, final Callable<Long> valueCallback) {
+    final String fullGaugeName;
+    String gaugeName = gauge.getGaugeName();
+    fullGaugeName = gaugeName + "." + getTableName(tableName);
+
+    addCallbackGaugeIfNeeded(fullGaugeName, valueCallback);
+  }
+
+  /**
+   * Similar to addCallbackGauge method.
+   * This method may be called multiple times, while it will be registered to callback function only once.
+   * @param metricName The name of the metric
+   * @param valueCallback The callback function used to retrieve the value of the gauge
+   */
+  public void addCallbackGaugeIfNeeded(final String metricName, final Callable<Long> valueCallback) {
+    if (!_gaugeValues.containsKey(metricName)) {
+      synchronized (_gaugeValues) {
+        if (!_gaugeValues.containsKey(metricName)) {
+          _gaugeValues.put(metricName, new AtomicLong(0L));
+          addCallbackGauge(metricName, valueCallback);
+        }
+      }
+    }
+  }
+
   /**
    * Adds a new gauge whose values are retrieved from a callback function.
    *
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
index c819382..827fbe5 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
@@ -27,6 +27,7 @@ import org.apache.pinot.common.Utils;
  */
 public enum BrokerGauge implements AbstractMetrics.Gauge {
   QUERY_QUOTA_CAPACITY_UTILIZATION_RATE("tables", false),
+  MAX_BURST_QPS("tables", false),
   QUERY_RATE_LIMIT_DISABLED("queryQuota", true),
   NETTY_CONNECTION_CONNECT_TIME_MS("nettyConnection", true);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org