You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2020/08/28 07:02:28 UTC
[incubator-pinot] branch master updated: Add max qps bucket count
(#5922)
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6b78dcc Add max qps bucket count (#5922)
6b78dcc is described below
commit 6b78dcc465bc65c394f414f647493cb5d15ad15b
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Fri Aug 28 00:02:10 2020 -0700
Add max qps bucket count (#5922)
* Add max qps bucket count
* Introduce StatefulHitCounter
Co-authored-by: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
---
.../HelixExternalViewBasedQueryQuotaManager.java | 15 ++--
.../apache/pinot/broker/queryquota/HitCounter.java | 26 ++++---
.../pinot/broker/queryquota/MaxHitRateTracker.java | 81 ++++++++++++++++++++++
.../pinot/broker/queryquota/QueryQuotaEntity.java | 18 +++--
.../broker/queryquota/MaxHitRateTrackerTest.java | 59 ++++++++++++++++
.../pinot/common/metrics/AbstractMetrics.java | 25 +++++++
.../apache/pinot/common/metrics/BrokerGauge.java | 1 +
7 files changed, 204 insertions(+), 21 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
index 6877d11..a33815d 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
@@ -55,7 +55,8 @@ import org.slf4j.LoggerFactory;
*/
public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHandler, QueryQuotaManager {
private static final Logger LOGGER = LoggerFactory.getLogger(HelixExternalViewBasedQueryQuotaManager.class);
- private static final int TIME_RANGE_IN_SECOND = 1;
+ private static final int ONE_SECOND_TIME_RANGE_IN_SECOND = 1;
+ private static final int ONE_MINUTE_TIME_RANGE_IN_SECOND = 60;
private final BrokerMetrics _brokerMetrics;
private final String _instanceId;
@@ -196,8 +197,8 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan
double perBrokerRate = overallRate / onlineCount;
QueryQuotaEntity queryQuotaEntity =
- new QueryQuotaEntity(RateLimiter.create(perBrokerRate), new HitCounter(TIME_RANGE_IN_SECOND), onlineCount,
- overallRate, stat.getVersion());
+ new QueryQuotaEntity(RateLimiter.create(perBrokerRate), new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND),
+ new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), onlineCount, overallRate, stat.getVersion());
_rateLimiterMap.put(tableNameWithType, queryQuotaEntity);
LOGGER.info(
"Rate limiter for table: {} has been initialized. Overall rate: {}. Per-broker rate: {}. Number of online broker instances: {}. Table config stat version: {}",
@@ -255,18 +256,22 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan
*/
private boolean tryAcquireToken(String tableNameWithType, QueryQuotaEntity queryQuotaEntity) {
// Use hit counter to count the number of hits.
- queryQuotaEntity.getHitCounter().hit();
+ queryQuotaEntity.getQpsTracker().hit();
+ queryQuotaEntity.getMaxQpsTracker().hit();
RateLimiter rateLimiter = queryQuotaEntity.getRateLimiter();
double perBrokerRate = rateLimiter.getRate();
// Emit the qps capacity utilization rate.
- int numHits = queryQuotaEntity.getHitCounter().getHitCount();
+ int numHits = queryQuotaEntity.getQpsTracker().getHitCount();
if (_brokerMetrics != null) {
int percentageOfCapacityUtilization = (int) (numHits * 100 / perBrokerRate);
LOGGER.debug("The percentage of rate limit capacity utilization is {}", percentageOfCapacityUtilization);
_brokerMetrics.setValueOfTableGauge(tableNameWithType, BrokerGauge.QUERY_QUOTA_CAPACITY_UTILIZATION_RATE,
percentageOfCapacityUtilization);
+
+ _brokerMetrics.addCallbackTableGaugeIfNeeded(tableNameWithType, BrokerGauge.MAX_BURST_QPS,
+ () -> (long) queryQuotaEntity.getMaxQpsTracker().getMaxCountPerBucket());
}
if (!rateLimiter.tryAcquire()) {
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HitCounter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HitCounter.java
index c5d5b6e..3d4d8d4 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HitCounter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HitCounter.java
@@ -30,15 +30,21 @@ import java.util.concurrent.atomic.AtomicLongArray;
* of hits within the last 100 time buckets.
*/
public class HitCounter {
- private static int BUCKET_COUNT = 100;
- private final int _timeBucketWidthMs;
- private final AtomicLongArray _bucketStartTime;
- private final AtomicIntegerArray _bucketHitCount;
+ private static int DEFAULT_BUCKET_COUNT = 100;
+ final int _timeBucketWidthMs;
+ final int _bucketCount;
+ final AtomicLongArray _bucketStartTime;
+ final AtomicIntegerArray _bucketHitCount;
public HitCounter(int timeRangeInSeconds) {
- _timeBucketWidthMs = timeRangeInSeconds * 1000 / BUCKET_COUNT;
- _bucketStartTime = new AtomicLongArray(BUCKET_COUNT);
- _bucketHitCount = new AtomicIntegerArray(BUCKET_COUNT);
+ this(timeRangeInSeconds, DEFAULT_BUCKET_COUNT);
+ }
+
+ public HitCounter(int timeRangeInSeconds, int bucketCount) {
+ _bucketCount = bucketCount;
+ _timeBucketWidthMs = timeRangeInSeconds * 1000 / _bucketCount;
+ _bucketStartTime = new AtomicLongArray(_bucketCount);
+ _bucketHitCount = new AtomicIntegerArray(_bucketCount);
}
/**
@@ -51,7 +57,7 @@ public class HitCounter {
@VisibleForTesting
void hit(long timestamp) {
long numTimeUnits = timestamp / _timeBucketWidthMs;
- int index = (int) (numTimeUnits % BUCKET_COUNT);
+ int index = (int) (numTimeUnits % _bucketCount);
if (_bucketStartTime.get(index) == numTimeUnits) {
_bucketHitCount.incrementAndGet(index);
} else {
@@ -77,8 +83,8 @@ public class HitCounter {
int getHitCount(long timestamp) {
long numTimeUnits = timestamp / _timeBucketWidthMs;
int count = 0;
- for (int i = 0; i < BUCKET_COUNT; i++) {
- if (numTimeUnits - _bucketStartTime.get(i) < BUCKET_COUNT) {
+ for (int i = 0; i < _bucketCount; i++) {
+ if (numTimeUnits - _bucketStartTime.get(i) < _bucketCount) {
count += _bucketHitCount.get(i);
}
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/MaxHitRateTracker.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/MaxHitRateTracker.java
new file mode 100644
index 0000000..25c6767
--- /dev/null
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/MaxHitRateTracker.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.queryquota;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * A stateful version of hit counter. Similar to the default hit counter, it maintains a list of buckets.
+ * Whereas it maintains an extra variable called _lastAccessTimestamp which tracks the last access time.
+ * If the stateful hit counter gets queried, it firstly compares the current timestamp and the last access timestamp,
+ * calculating the start index and end index among the buckets. Then, it traverses through all the valid candidate buckets.
+ * If the current timestamp has exceeded the current time range of all the buckets, this hit counter will use
+ * the current timestamp minus the default time queried time range to calculate the start time index.
+ */
+public class MaxHitRateTracker extends HitCounter {
+ private static int ONE_SECOND_BUCKET_WIDTH_MS = 1000;
+ private static int MAX_TIME_RANGE_FACTOR = 2;
+
+ private final long _maxTimeRangeMs;
+ private final long _defaultTimeRangeMs;
+ private volatile long _lastAccessTimestamp;
+
+ public MaxHitRateTracker(int timeRangeInSeconds) {
+ this(timeRangeInSeconds, timeRangeInSeconds * MAX_TIME_RANGE_FACTOR);
+ }
+
+ private MaxHitRateTracker(int defaultTimeRangeInSeconds, int maxTimeRangeInSeconds) {
+ super(maxTimeRangeInSeconds, (int) (maxTimeRangeInSeconds * 1000L / ONE_SECOND_BUCKET_WIDTH_MS));
+ _defaultTimeRangeMs = defaultTimeRangeInSeconds * 1000L;
+ _maxTimeRangeMs = maxTimeRangeInSeconds * 1000L;
+ }
+
+ /**
+ * Get the maximum count among the buckets
+ */
+ public int getMaxCountPerBucket() {
+ return getMaxCountPerBucket(System.currentTimeMillis());
+ }
+
+ @VisibleForTesting
+ int getMaxCountPerBucket(long now) {
+ // Update the last access timestamp if the hit counter didn't get queried for more than _maxTimeRangeMs.
+ long then = _lastAccessTimestamp;
+ if (now - then > _maxTimeRangeMs) {
+ then = now - _defaultTimeRangeMs;
+ }
+ long startTimeUnits = then / _timeBucketWidthMs;
+ int startIndex = (int) (startTimeUnits % _bucketCount);
+
+ long numTimeUnits = now / _timeBucketWidthMs;
+ int endIndex = (int) (numTimeUnits % _bucketCount);
+
+ int maxCount = 0;
+ // Skipping the end index here as its bucket hasn't fully gathered all the hits yet.
+ for (int i = startIndex; i != endIndex; i = (++i % _bucketCount)) {
+ if (numTimeUnits - _bucketStartTime.get(i) < _bucketCount) {
+ maxCount = Math.max(_bucketHitCount.get(i), maxCount);
+ }
+ }
+
+ // Update the last access timestamp
+ _lastAccessTimestamp = now;
+ return maxCount;
+ }
+}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaEntity.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaEntity.java
index 40a2c96..9c243ae 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaEntity.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaEntity.java
@@ -24,15 +24,17 @@ import com.google.common.util.concurrent.RateLimiter;
public class QueryQuotaEntity {
private RateLimiter _rateLimiter;
- private HitCounter _hitCounter;
+ private HitCounter _qpsTracker;
+ private MaxHitRateTracker _maxQpsTracker;
private int _numOnlineBrokers;
private double _overallRate;
private int _tableConfigStatVersion;
- public QueryQuotaEntity(RateLimiter rateLimiter, HitCounter hitCounter, int numOnlineBrokers, double overallRate,
- int tableConfigStatVersion) {
+ public QueryQuotaEntity(RateLimiter rateLimiter, HitCounter qpsTracker, MaxHitRateTracker maxQpsTracker,
+ int numOnlineBrokers, double overallRate, int tableConfigStatVersion) {
_rateLimiter = rateLimiter;
- _hitCounter = hitCounter;
+ _qpsTracker = qpsTracker;
+ _maxQpsTracker = maxQpsTracker;
_numOnlineBrokers = numOnlineBrokers;
_overallRate = overallRate;
_tableConfigStatVersion = tableConfigStatVersion;
@@ -42,8 +44,12 @@ public class QueryQuotaEntity {
return _rateLimiter;
}
- public HitCounter getHitCounter() {
- return _hitCounter;
+ public HitCounter getQpsTracker() {
+ return _qpsTracker;
+ }
+
+ public MaxHitRateTracker getMaxQpsTracker() {
+ return _maxQpsTracker;
}
public int getNumOnlineBrokers() {
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/MaxHitRateTrackerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/MaxHitRateTrackerTest.java
new file mode 100644
index 0000000..26ccb1f
--- /dev/null
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/MaxHitRateTrackerTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.queryquota;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class MaxHitRateTrackerTest {
+
+ @Test
+ public void testMaxHitRateTracker() {
+ int timeInSec = 60;
+ MaxHitRateTracker hitCounter = new MaxHitRateTracker(timeInSec);
+ long currentTimestamp = System.currentTimeMillis();
+ for (int i = 0; i < timeInSec; i++) {
+ for (int j = 0; j < 5; j++) {
+ hitCounter.hit(currentTimestamp + i * 1000);
+ }
+ }
+ long latestTimeStamp = currentTimestamp + (timeInSec - 1) * 1000;
+ Assert.assertNotNull(hitCounter);
+ Assert.assertEquals(5, hitCounter.getMaxCountPerBucket(latestTimeStamp));
+
+ // 2 seconds have passed, the hit counter should return 5 as well since the count in the last bucket could increase.
+ latestTimeStamp = latestTimeStamp + 2000L;
+ Assert.assertEquals(5, hitCounter.getMaxCountPerBucket(latestTimeStamp));
+
+ // This time it should return 0 as the internal lastAccessTimestamp has already been updated and there is no more hits between the gap.
+ latestTimeStamp = latestTimeStamp + 2000L;
+ Assert.assertEquals(0, hitCounter.getMaxCountPerBucket(latestTimeStamp));
+
+ // Increment the hit in this second and we should see the result becomes 1.
+ hitCounter.hit(latestTimeStamp);
+ latestTimeStamp = latestTimeStamp + 2000L;
+ Assert.assertEquals(1, hitCounter.getMaxCountPerBucket(latestTimeStamp));
+
+ // More than a time range period has passed and the hit counter should return 0 as there is no hits.
+ hitCounter.hit(latestTimeStamp);
+ latestTimeStamp = latestTimeStamp + timeInSec * 2 * 1000L + 2000L;
+ Assert.assertEquals(0, hitCounter.getMaxCountPerBucket(latestTimeStamp));
+ }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
index cf40f69..1b76ed1 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
@@ -393,6 +393,31 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e
}
}
+ public void addCallbackTableGaugeIfNeeded(final String tableName, final G gauge, final Callable<Long> valueCallback) {
+ final String fullGaugeName;
+ String gaugeName = gauge.getGaugeName();
+ fullGaugeName = gaugeName + "." + getTableName(tableName);
+
+ addCallbackGaugeIfNeeded(fullGaugeName, valueCallback);
+ }
+
+ /**
+ * Similar to addCallbackGauge method.
+ * This method may be called multiple times, while it will be registered to callback function only once.
+ * @param metricName The name of the metric
+ * @param valueCallback The callback function used to retrieve the value of the gauge
+ */
+ public void addCallbackGaugeIfNeeded(final String metricName, final Callable<Long> valueCallback) {
+ if (!_gaugeValues.containsKey(metricName)) {
+ synchronized (_gaugeValues) {
+ if (!_gaugeValues.containsKey(metricName)) {
+ _gaugeValues.put(metricName, new AtomicLong(0L));
+ addCallbackGauge(metricName, valueCallback);
+ }
+ }
+ }
+ }
+
/**
* Adds a new gauge whose values are retrieved from a callback function.
*
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
index c819382..827fbe5 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
@@ -27,6 +27,7 @@ import org.apache.pinot.common.Utils;
*/
public enum BrokerGauge implements AbstractMetrics.Gauge {
QUERY_QUOTA_CAPACITY_UTILIZATION_RATE("tables", false),
+ MAX_BURST_QPS("tables", false),
QUERY_RATE_LIMIT_DISABLED("queryQuota", true),
NETTY_CONNECTION_CONNECT_TIME_MS("nettyConnection", true);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org