You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2021/07/30 20:45:15 UTC
[pinot] 01/01: Enable hit counter and max hit rate tracker
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch enable-hit-counter-and-max-hit-rate-tracker
in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 5d3c2950fe172b0ddee9f64ed44262ab004ecce0
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Fri Jul 30 13:44:00 2021 -0700
Enable hit counter and max hit rate tracker
---
.../HelixExternalViewBasedQueryQuotaManager.java | 73 +++++++++++++++-------
.../pinot/broker/queryquota/QueryQuotaEntity.java | 4 ++
...elixExternalViewBasedQueryQuotaManagerTest.java | 28 ++++++---
3 files changed, 77 insertions(+), 28 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
index 630ac18..49fa9ea 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
@@ -51,7 +51,6 @@ import org.slf4j.LoggerFactory;
* This class is to support the qps quota feature.
* It depends on the broker source change to update the dynamic rate limit,
* which means it only gets updated when a new table added or a broker restarted.
- * TODO: support adding new rate limiter for existing tables without restarting the broker.
*/
public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHandler, QueryQuotaManager {
private static final Logger LOGGER = LoggerFactory.getLogger(HelixExternalViewBasedQueryQuotaManager.class);
@@ -82,9 +81,8 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan
@Override
public void processClusterChange(HelixConstants.ChangeType changeType) {
- Preconditions
- .checkState(changeType == HelixConstants.ChangeType.EXTERNAL_VIEW
- || changeType == HelixConstants.ChangeType.INSTANCE_CONFIG, "Illegal change type: " + changeType);
+ Preconditions.checkState(changeType == HelixConstants.ChangeType.EXTERNAL_VIEW
+ || changeType == HelixConstants.ChangeType.INSTANCE_CONFIG, "Illegal change type: " + changeType);
if (changeType == HelixConstants.ChangeType.EXTERNAL_VIEW) {
ExternalView brokerResourceEV = HelixHelper
.getExternalViewForResource(_helixManager.getClusterManagmentTool(), _helixManager.getClusterName(),
@@ -109,17 +107,15 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan
* @param brokerResource broker resource which stores all the broker states of each table.
*/
public void initOrUpdateTableQueryQuota(TableConfig tableConfig, ExternalView brokerResource) {
+ if (tableConfig == null) {
+ LOGGER.info("No query quota to update since table config is null");
+ return;
+ }
String tableNameWithType = tableConfig.getTableName();
LOGGER.info("Initializing rate limiter for table {}", tableNameWithType);
// Create rate limiter if query quota config is specified.
- QuotaConfig quotaConfig = tableConfig.getQuotaConfig();
- if (quotaConfig == null || quotaConfig.getMaxQueriesPerSecond() == null) {
- LOGGER.info("No qps config specified for table: {}", tableNameWithType);
- removeRateLimiter(tableNameWithType);
- } else {
- createOrUpdateRateLimiter(tableNameWithType, brokerResource, quotaConfig);
- }
+ createOrUpdateRateLimiter(tableNameWithType, brokerResource, tableConfig.getQuotaConfig());
}
/**
@@ -138,10 +134,6 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan
_rateLimiterMap.remove(tableNameWithType);
}
- public boolean containsRateLimiterForTable(String tableNameWithType) {
- return _rateLimiterMap.containsKey(tableNameWithType);
- }
-
/**
* Get QuotaConfig from property store.
* @param tableNameWithType table name with table type.
@@ -165,11 +157,13 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan
QuotaConfig quotaConfig) {
if (quotaConfig == null || quotaConfig.getMaxQueriesPerSecond() == null) {
LOGGER.info("No qps config specified for table: {}", tableNameWithType);
+ buildEmptyOrResetRateLimiterInQueryQuotaEntity(tableNameWithType);
return;
}
if (brokerResource == null) {
LOGGER.warn("Failed to init qps quota for table {}. No broker resource connected!", tableNameWithType);
+ buildEmptyOrResetRateLimiterInQueryQuotaEntity(tableNameWithType);
return;
}
@@ -207,7 +201,13 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan
"Rate limiter for table: {} has been initialized. Overall rate: {}. Per-broker rate: {}. Number of online broker instances: {}. Table config stat version: {}",
tableNameWithType, overallRate, perBrokerRate, onlineCount, stat.getVersion());
} else {
- queryQuotaEntity.getRateLimiter().setRate(perBrokerRate);
+ RateLimiter rateLimiter = queryQuotaEntity.getRateLimiter();
+ if (rateLimiter == null) {
+ rateLimiter = RateLimiter.create(perBrokerRate);
+ queryQuotaEntity.setRateLimiter(rateLimiter);
+ } else {
+ rateLimiter.setRate(perBrokerRate);
+ }
queryQuotaEntity.setNumOnlineBrokers(onlineCount);
queryQuotaEntity.setOverallRate(overallRate);
queryQuotaEntity.setTableConfigStatVersion(stat.getVersion());
@@ -225,6 +225,22 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan
}
/**
+ * Build an empty rate limiter in the new query quota entity, or set the rate limiter to null in an existing query quota entity.
+ */
+ private void buildEmptyOrResetRateLimiterInQueryQuotaEntity(String tableNameWithType) {
+ QueryQuotaEntity queryQuotaEntity = _rateLimiterMap.get(tableNameWithType);
+ if (queryQuotaEntity == null) {
+ // Create an QueryQuotaEntity object without setting a rate limiter.
+ queryQuotaEntity = new QueryQuotaEntity(null, new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND),
+ new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), 0, 0, 0);
+ _rateLimiterMap.put(tableNameWithType, queryQuotaEntity);
+ } else {
+ // Set rate limiter to null for an existing QueryQuotaEntity object.
+ queryQuotaEntity.setRateLimiter(null);
+ }
+ }
+
+ /**
* {@inheritDoc}
* <p>Acquires a token from rate limiter based on the table name.
*
@@ -276,6 +292,10 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan
queryQuotaEntity.getMaxQpsTracker().hit();
RateLimiter rateLimiter = queryQuotaEntity.getRateLimiter();
+ // Return true if no rate limiter is initialized.
+ if (rateLimiter == null) {
+ return true;
+ }
double perBrokerRate = rateLimiter.getRate();
// Emit the qps capacity utilization rate.
@@ -302,6 +322,11 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan
}
@VisibleForTesting
+ public QueryQuotaEntity getRateLimiterForTable(String tableNameWithType) {
+ return _rateLimiterMap.get(tableNameWithType);
+ }
+
+ @VisibleForTesting
public void cleanUpRateLimiterMap() {
_rateLimiterMap.clear();
}
@@ -328,6 +353,10 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan
Map.Entry<String, QueryQuotaEntity> entry = it.next();
String tableNameWithType = entry.getKey();
QueryQuotaEntity queryQuotaEntity = entry.getValue();
+ if (queryQuotaEntity.getRateLimiter() == null) {
+ // No rate limiter set, skip this table.
+ continue;
+ }
// Get number of online brokers.
Map<String, String> stateMap = currentBrokerResource.getStateMap(tableNameWithType);
@@ -406,12 +435,14 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan
private void getQueryQuotaEnabledFlagFromInstanceConfig() {
try {
- Map<String, String> instanceConfigsMap =
- HelixHelper.getInstanceConfigsMapFor(_instanceId, _helixManager.getClusterName(), _helixManager.getClusterManagmentTool());
- String queryRateLimitDisabled = instanceConfigsMap.getOrDefault(CommonConstants.Helix.QUERY_RATE_LIMIT_DISABLED, "false");
+ Map<String, String> instanceConfigsMap = HelixHelper
+ .getInstanceConfigsMapFor(_instanceId, _helixManager.getClusterName(),
+ _helixManager.getClusterManagmentTool());
+ String queryRateLimitDisabled =
+ instanceConfigsMap.getOrDefault(CommonConstants.Helix.QUERY_RATE_LIMIT_DISABLED, "false");
_queryRateLimitDisabled = Boolean.parseBoolean(queryRateLimitDisabled);
- LOGGER.info("Set query rate limiting to: {} for all {} tables in this broker.", _queryRateLimitDisabled ? "DISABLED" : "ENABLED",
- _rateLimiterMap.size());
+ LOGGER.info("Set query rate limiting to: {} for all {} tables in this broker.",
+ _queryRateLimitDisabled ? "DISABLED" : "ENABLED", _rateLimiterMap.size());
} catch (ZkNoNodeException e) {
// It's a brand new broker. Skip checking instance config.
_queryRateLimitDisabled = false;
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaEntity.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaEntity.java
index 9c243ae..82d8691 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaEntity.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaEntity.java
@@ -40,6 +40,10 @@ public class QueryQuotaEntity {
_tableConfigStatVersion = tableConfigStatVersion;
}
+ public void setRateLimiter(RateLimiter rateLimiter) {
+ _rateLimiter = rateLimiter;
+ }
+
public RateLimiter getRateLimiter() {
return _rateLimiter;
}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
index c24e407..2a307f2 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
@@ -165,7 +165,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
_queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource);
- Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+ QueryQuotaEntity queryQuotaEntity = _queryQuotaManager.getRateLimiterForTable(OFFLINE_TABLE_NAME);
+ Assert.assertNull(queryQuotaEntity.getRateLimiter());
}
@Test
@@ -182,7 +184,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
_queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource);
- Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+ QueryQuotaEntity queryQuotaEntity = _queryQuotaManager.getRateLimiterForTable(OFFLINE_TABLE_NAME);
+ Assert.assertNull(queryQuotaEntity.getRateLimiter());
// Nothing happened since it doesn't have qps quota.
_queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
@@ -203,7 +207,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME);
TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
_queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource);
- Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+ QueryQuotaEntity queryQuotaEntity = _queryQuotaManager.getRateLimiterForTable(OFFLINE_TABLE_NAME);
+ Assert.assertNull(queryQuotaEntity.getRateLimiter());
// Drop the offline table won't have any affect since it is table type specific.
_queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
@@ -292,7 +298,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME);
TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME);
_queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource);
- Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+ QueryQuotaEntity queryQuotaEntity = _queryQuotaManager.getRateLimiterForTable(REALTIME_TABLE_NAME);
+ Assert.assertNull(queryQuotaEntity.getRateLimiter());
}
@Test
@@ -309,7 +317,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME);
TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME);
_queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource);
- Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+ QueryQuotaEntity queryQuotaEntity = _queryQuotaManager.getRateLimiterForTable(REALTIME_TABLE_NAME);
+ Assert.assertNull(queryQuotaEntity.getRateLimiter());
}
@Test
@@ -326,7 +336,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME);
_queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource);
- Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+ QueryQuotaEntity queryQuotaEntity = _queryQuotaManager.getRateLimiterForTable(REALTIME_TABLE_NAME);
+ Assert.assertNull(queryQuotaEntity.getRateLimiter());
}
@Test
@@ -335,7 +347,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
setQps(tableConfig);
_queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, null);
- Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+ QueryQuotaEntity queryQuotaEntity = _queryQuotaManager.getRateLimiterForTable(OFFLINE_TABLE_NAME);
+ Assert.assertNull(queryQuotaEntity.getRateLimiter());
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org