You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2021/07/30 20:45:15 UTC

[pinot] 01/01: Enable hit counter and max hit rate tracker

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch enable-hit-counter-and-max-hit-rate-tracker
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 5d3c2950fe172b0ddee9f64ed44262ab004ecce0
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Fri Jul 30 13:44:00 2021 -0700

    Enable hit counter and max hit rate tracker
---
 .../HelixExternalViewBasedQueryQuotaManager.java   | 73 +++++++++++++++-------
 .../pinot/broker/queryquota/QueryQuotaEntity.java  |  4 ++
 ...elixExternalViewBasedQueryQuotaManagerTest.java | 28 ++++++---
 3 files changed, 77 insertions(+), 28 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
index 630ac18..49fa9ea 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
@@ -51,7 +51,6 @@ import org.slf4j.LoggerFactory;
  * This class is to support the qps quota feature.
  * It depends on the broker source change to update the dynamic rate limit,
  *  which means it only gets updated when a new table added or a broker restarted.
- * TODO: support adding new rate limiter for existing tables without restarting the broker.
  */
 public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHandler, QueryQuotaManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(HelixExternalViewBasedQueryQuotaManager.class);
@@ -82,9 +81,8 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan
 
   @Override
   public void processClusterChange(HelixConstants.ChangeType changeType) {
-    Preconditions
-        .checkState(changeType == HelixConstants.ChangeType.EXTERNAL_VIEW
-            || changeType == HelixConstants.ChangeType.INSTANCE_CONFIG, "Illegal change type: " + changeType);
+    Preconditions.checkState(changeType == HelixConstants.ChangeType.EXTERNAL_VIEW
+        || changeType == HelixConstants.ChangeType.INSTANCE_CONFIG, "Illegal change type: " + changeType);
     if (changeType == HelixConstants.ChangeType.EXTERNAL_VIEW) {
       ExternalView brokerResourceEV = HelixHelper
           .getExternalViewForResource(_helixManager.getClusterManagmentTool(), _helixManager.getClusterName(),
@@ -109,17 +107,15 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan
    * @param brokerResource broker resource which stores all the broker states of each table.
    */
   public void initOrUpdateTableQueryQuota(TableConfig tableConfig, ExternalView brokerResource) {
+    if (tableConfig == null) {
+      LOGGER.info("No query quota to update since table config is null");
+      return;
+    }
     String tableNameWithType = tableConfig.getTableName();
     LOGGER.info("Initializing rate limiter for table {}", tableNameWithType);
 
     // Create rate limiter if query quota config is specified.
-    QuotaConfig quotaConfig = tableConfig.getQuotaConfig();
-    if (quotaConfig == null || quotaConfig.getMaxQueriesPerSecond() == null) {
-      LOGGER.info("No qps config specified for table: {}", tableNameWithType);
-      removeRateLimiter(tableNameWithType);
-    } else {
-      createOrUpdateRateLimiter(tableNameWithType, brokerResource, quotaConfig);
-    }
+    createOrUpdateRateLimiter(tableNameWithType, brokerResource, tableConfig.getQuotaConfig());
   }
 
   /**
@@ -138,10 +134,6 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan
     _rateLimiterMap.remove(tableNameWithType);
   }
 
-  public boolean containsRateLimiterForTable(String tableNameWithType) {
-    return _rateLimiterMap.containsKey(tableNameWithType);
-  }
-
   /**
    * Get QuotaConfig from property store.
    * @param tableNameWithType table name with table type.
@@ -165,11 +157,13 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan
       QuotaConfig quotaConfig) {
     if (quotaConfig == null || quotaConfig.getMaxQueriesPerSecond() == null) {
       LOGGER.info("No qps config specified for table: {}", tableNameWithType);
+      buildEmptyOrResetRateLimiterInQueryQuotaEntity(tableNameWithType);
       return;
     }
 
     if (brokerResource == null) {
       LOGGER.warn("Failed to init qps quota for table {}. No broker resource connected!", tableNameWithType);
+      buildEmptyOrResetRateLimiterInQueryQuotaEntity(tableNameWithType);
       return;
     }
 
@@ -207,7 +201,13 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan
           "Rate limiter for table: {} has been initialized. Overall rate: {}. Per-broker rate: {}. Number of online broker instances: {}. Table config stat version: {}",
           tableNameWithType, overallRate, perBrokerRate, onlineCount, stat.getVersion());
     } else {
-      queryQuotaEntity.getRateLimiter().setRate(perBrokerRate);
+      RateLimiter rateLimiter = queryQuotaEntity.getRateLimiter();
+      if (rateLimiter == null) {
+        rateLimiter = RateLimiter.create(perBrokerRate);
+        queryQuotaEntity.setRateLimiter(rateLimiter);
+      } else {
+        rateLimiter.setRate(perBrokerRate);
+      }
       queryQuotaEntity.setNumOnlineBrokers(onlineCount);
       queryQuotaEntity.setOverallRate(overallRate);
       queryQuotaEntity.setTableConfigStatVersion(stat.getVersion());
@@ -225,6 +225,22 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan
   }
 
   /**
+   * Build an empty rate limiter in the new query quota entity, or set the rate limiter to null in an existing query quota entity.
+   */
+  private void buildEmptyOrResetRateLimiterInQueryQuotaEntity(String tableNameWithType) {
+    QueryQuotaEntity queryQuotaEntity = _rateLimiterMap.get(tableNameWithType);
+    if (queryQuotaEntity == null) {
+      // Create an QueryQuotaEntity object without setting a rate limiter.
+      queryQuotaEntity = new QueryQuotaEntity(null, new HitCounter(ONE_SECOND_TIME_RANGE_IN_SECOND),
+          new MaxHitRateTracker(ONE_MINUTE_TIME_RANGE_IN_SECOND), 0, 0, 0);
+      _rateLimiterMap.put(tableNameWithType, queryQuotaEntity);
+    } else {
+      // Set rate limiter to null for an existing QueryQuotaEntity object.
+      queryQuotaEntity.setRateLimiter(null);
+    }
+  }
+
+  /**
    * {@inheritDoc}
    * <p>Acquires a token from rate limiter based on the table name.
    *
@@ -276,6 +292,10 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan
     queryQuotaEntity.getMaxQpsTracker().hit();
 
     RateLimiter rateLimiter = queryQuotaEntity.getRateLimiter();
+    // Return true if no rate limiter is initialized.
+    if (rateLimiter == null) {
+      return true;
+    }
     double perBrokerRate = rateLimiter.getRate();
 
     // Emit the qps capacity utilization rate.
@@ -302,6 +322,11 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan
   }
 
   @VisibleForTesting
+  public QueryQuotaEntity getRateLimiterForTable(String tableNameWithType) {
+    return _rateLimiterMap.get(tableNameWithType);
+  }
+
+  @VisibleForTesting
   public void cleanUpRateLimiterMap() {
     _rateLimiterMap.clear();
   }
@@ -328,6 +353,10 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan
       Map.Entry<String, QueryQuotaEntity> entry = it.next();
       String tableNameWithType = entry.getKey();
       QueryQuotaEntity queryQuotaEntity = entry.getValue();
+      if (queryQuotaEntity.getRateLimiter() == null) {
+        // No rate limiter set, skip this table.
+        continue;
+      }
 
       // Get number of online brokers.
       Map<String, String> stateMap = currentBrokerResource.getStateMap(tableNameWithType);
@@ -406,12 +435,14 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan
 
   private void getQueryQuotaEnabledFlagFromInstanceConfig() {
     try {
-      Map<String, String> instanceConfigsMap =
-          HelixHelper.getInstanceConfigsMapFor(_instanceId, _helixManager.getClusterName(), _helixManager.getClusterManagmentTool());
-      String queryRateLimitDisabled = instanceConfigsMap.getOrDefault(CommonConstants.Helix.QUERY_RATE_LIMIT_DISABLED, "false");
+      Map<String, String> instanceConfigsMap = HelixHelper
+          .getInstanceConfigsMapFor(_instanceId, _helixManager.getClusterName(),
+              _helixManager.getClusterManagmentTool());
+      String queryRateLimitDisabled =
+          instanceConfigsMap.getOrDefault(CommonConstants.Helix.QUERY_RATE_LIMIT_DISABLED, "false");
       _queryRateLimitDisabled = Boolean.parseBoolean(queryRateLimitDisabled);
-      LOGGER.info("Set query rate limiting to: {} for all {} tables in this broker.", _queryRateLimitDisabled ? "DISABLED" : "ENABLED",
-          _rateLimiterMap.size());
+      LOGGER.info("Set query rate limiting to: {} for all {} tables in this broker.",
+          _queryRateLimitDisabled ? "DISABLED" : "ENABLED", _rateLimiterMap.size());
     } catch (ZkNoNodeException e) {
       // It's a brand new broker. Skip checking instance config.
       _queryRateLimitDisabled = false;
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaEntity.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaEntity.java
index 9c243ae..82d8691 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaEntity.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaEntity.java
@@ -40,6 +40,10 @@ public class QueryQuotaEntity {
     _tableConfigStatVersion = tableConfigStatVersion;
   }
 
+  public void setRateLimiter(RateLimiter rateLimiter) {
+    _rateLimiter = rateLimiter;
+  }
+
   public RateLimiter getRateLimiter() {
     return _rateLimiter;
   }
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
index c24e407..2a307f2 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManagerTest.java
@@ -165,7 +165,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
     ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
     _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource);
-    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+    QueryQuotaEntity queryQuotaEntity = _queryQuotaManager.getRateLimiterForTable(OFFLINE_TABLE_NAME);
+    Assert.assertNull(queryQuotaEntity.getRateLimiter());
   }
 
   @Test
@@ -182,7 +184,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
     ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
     _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource);
-    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+    QueryQuotaEntity queryQuotaEntity = _queryQuotaManager.getRateLimiterForTable(OFFLINE_TABLE_NAME);
+    Assert.assertNull(queryQuotaEntity.getRateLimiter());
 
     // Nothing happened since it doesn't have qps quota.
     _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
@@ -203,7 +207,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
     ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
     _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource);
-    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+    QueryQuotaEntity queryQuotaEntity = _queryQuotaManager.getRateLimiterForTable(OFFLINE_TABLE_NAME);
+    Assert.assertNull(queryQuotaEntity.getRateLimiter());
 
     // Drop the offline table won't have any affect since it is table type specific.
     _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
@@ -292,7 +298,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
     ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME);
     _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource);
-    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+    QueryQuotaEntity queryQuotaEntity = _queryQuotaManager.getRateLimiterForTable(REALTIME_TABLE_NAME);
+    Assert.assertNull(queryQuotaEntity.getRateLimiter());
   }
 
   @Test
@@ -309,7 +317,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
     ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME);
     _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource);
-    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+    QueryQuotaEntity queryQuotaEntity = _queryQuotaManager.getRateLimiterForTable(REALTIME_TABLE_NAME);
+    Assert.assertNull(queryQuotaEntity.getRateLimiter());
   }
 
   @Test
@@ -326,7 +336,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
     ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME);
     _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, brokerResource);
-    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+    QueryQuotaEntity queryQuotaEntity = _queryQuotaManager.getRateLimiterForTable(REALTIME_TABLE_NAME);
+    Assert.assertNull(queryQuotaEntity.getRateLimiter());
   }
 
   @Test
@@ -335,7 +347,9 @@ public class HelixExternalViewBasedQueryQuotaManagerTest {
     TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
     setQps(tableConfig);
     _queryQuotaManager.initOrUpdateTableQueryQuota(tableConfig, null);
-    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+    QueryQuotaEntity queryQuotaEntity = _queryQuotaManager.getRateLimiterForTable(OFFLINE_TABLE_NAME);
+    Assert.assertNull(queryQuotaEntity.getRateLimiter());
   }
 
   @Test

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org