You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/04/19 04:57:08 UTC

[incubator-pinot] 01/01: Decouple BrokerServerBuilder from Helix

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch decouple_broker
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 3594a32faad7401ba87fe4bbbe5f3b09e54d4b4f
Author: Jackie (Xiaotian) Jiang <xa...@linkedin.com>
AuthorDate: Thu Apr 18 21:54:09 2019 -0700

    Decouple BrokerServerBuilder from Helix
    
    BrokerServerBuilder should be independent of Helix, and only
    performs broker functionalities.
    
    - Add QueryQuotaManager interface for quota management
    - Remove LiveInstanceChangeListener from BrokerServerBuilder
      constructor
    - Move constants into CommonConstants
---
 .../pinot/broker/broker/BrokerServerBuilder.java   | 39 ++++-----
 ...okerResourceOnlineOfflineStateModelFactory.java | 16 ++--
 .../broker/broker/helix/HelixBrokerStarter.java    | 36 ++++----
 ...nager.java => HelixBasedQueryQuotaManager.java} | 12 +--
 .../pinot/broker/queryquota/QueryQuotaManager.java | 29 +++++++
 .../requesthandler/BaseBrokerRequestHandler.java   | 18 ++--
 .../ConnectionPoolBrokerRequestHandler.java        | 15 ++--
 .../SingleConnectionBrokerRequestHandler.java      |  6 +-
 ...t.java => HelixBasedQueryQuotaManagerTest.java} | 98 +++++++++++-----------
 .../apache/pinot/common/utils/CommonConstants.java | 12 +++
 .../pinot/integration/tests/ClusterTest.java       |  8 +-
 11 files changed, 161 insertions(+), 128 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
index a63e954..1880cc5 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
@@ -22,8 +22,7 @@ import com.google.common.base.Preconditions;
 import com.yammer.metrics.core.MetricsRegistry;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.configuration.Configuration;
-import org.apache.pinot.broker.broker.helix.LiveInstanceChangeHandler;
-import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
+import org.apache.pinot.broker.queryquota.QueryQuotaManager;
 import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
 import org.apache.pinot.broker.requesthandler.ConnectionPoolBrokerRequestHandler;
 import org.apache.pinot.broker.requesthandler.SingleConnectionBrokerRequestHandler;
@@ -36,33 +35,24 @@ import org.apache.pinot.common.utils.CommonConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.common.utils.CommonConstants.Broker.*;
+
 
 public class BrokerServerBuilder {
   private static final Logger LOGGER = LoggerFactory.getLogger(BrokerServerBuilder.class);
 
-  public static final String DELAY_SHUTDOWN_TIME_MS_CONFIG = "pinot.broker.delayShutdownTimeMs";
-  public static final long DEFAULT_DELAY_SHUTDOWN_TIME_MS = 10_000;
-  public static final String ACCESS_CONTROL_PREFIX = "pinot.broker.access.control";
-  public static final String METRICS_CONFIG_PREFIX = "pinot.broker.metrics";
-  public static final String TABLE_LEVEL_METRICS_CONFIG = "pinot.broker.enableTableLevelMetrics";
-  public static final String REQUEST_HANDLER_TYPE_CONFIG = "pinot.broker.requestHandlerType";
-  public static final String SINGLE_CONNECTION_REQUEST_HANDLER_TYPE = "singleConnection";
-  public static final String CONNECTION_POOL_REQUEST_HANDLER_TYPE = "connectionPool";
-  public static final String DEFAULT_REQUEST_HANDLER_TYPE = SINGLE_CONNECTION_REQUEST_HANDLER_TYPE;
-
   public enum State {
     INIT, STARTING, RUNNING, SHUTTING_DOWN, SHUTDOWN
   }
 
   // Running State Of broker
-  private final AtomicReference<State> _state = new AtomicReference<>();
+  private final AtomicReference<State> _state = new AtomicReference<>(State.INIT);
 
   private final Configuration _config;
   private final long _delayedShutdownTimeMs;
   private final RoutingTable _routingTable;
   private final TimeBoundaryService _timeBoundaryService;
-  private final LiveInstanceChangeHandler _liveInstanceChangeHandler;
-  private final TableQueryQuotaManager _tableQueryQuotaManager;
+  private final QueryQuotaManager _queryQuotaManager;
   private final AccessControlFactory _accessControlFactory;
   private final MetricsRegistry _metricsRegistry;
   private final BrokerMetrics _brokerMetrics;
@@ -70,34 +60,33 @@ public class BrokerServerBuilder {
   private final BrokerAdminApiApplication _brokerAdminApplication;
 
   public BrokerServerBuilder(Configuration config, RoutingTable routingTable, TimeBoundaryService timeBoundaryService,
-      LiveInstanceChangeHandler liveInstanceChangeHandler, TableQueryQuotaManager tableQueryQuotaManager) {
-    _state.set(State.INIT);
+      QueryQuotaManager queryQuotaManager) {
     _config = config;
-    _delayedShutdownTimeMs = config.getLong(DELAY_SHUTDOWN_TIME_MS_CONFIG, DEFAULT_DELAY_SHUTDOWN_TIME_MS);
+    _delayedShutdownTimeMs = config.getLong(CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, DEFAULT_DELAY_SHUTDOWN_TIME_MS);
     _routingTable = routingTable;
     _timeBoundaryService = timeBoundaryService;
-    _liveInstanceChangeHandler = liveInstanceChangeHandler;
-    _tableQueryQuotaManager = tableQueryQuotaManager;
-    _accessControlFactory = AccessControlFactory.loadFactory(_config.subset(ACCESS_CONTROL_PREFIX));
+    _queryQuotaManager = queryQuotaManager;
+    _accessControlFactory = AccessControlFactory.loadFactory(_config.subset(ACCESS_CONTROL_CONFIG_PREFIX));
     _metricsRegistry = new MetricsRegistry();
     MetricsHelper.initializeMetrics(config.subset(METRICS_CONFIG_PREFIX));
     MetricsHelper.registerMetricsRegistry(_metricsRegistry);
-    _brokerMetrics = new BrokerMetrics(_metricsRegistry, !_config.getBoolean(TABLE_LEVEL_METRICS_CONFIG, true));
+    _brokerMetrics =
+        new BrokerMetrics(_metricsRegistry, !_config.getBoolean(CONFIG_OF_ENABLE_TABLE_LEVEL_METRICS, true));
     _brokerMetrics.initializeGlobalMeters();
     _brokerRequestHandler = buildRequestHandler();
     _brokerAdminApplication = new BrokerAdminApiApplication(this);
   }
 
   private BrokerRequestHandler buildRequestHandler() {
-    String requestHandlerType = _config.getString(REQUEST_HANDLER_TYPE_CONFIG, DEFAULT_REQUEST_HANDLER_TYPE);
+    String requestHandlerType = _config.getString(CONFIG_OF_REQUEST_HANDLER_TYPE, DEFAULT_REQUEST_HANDLER_TYPE);
     if (requestHandlerType.equalsIgnoreCase(CONNECTION_POOL_REQUEST_HANDLER_TYPE)) {
       LOGGER.info("Using ConnectionPoolBrokerRequestHandler");
       return new ConnectionPoolBrokerRequestHandler(_config, _routingTable, _timeBoundaryService, _accessControlFactory,
-          _tableQueryQuotaManager, _brokerMetrics, _liveInstanceChangeHandler, _metricsRegistry);
+          _queryQuotaManager, _brokerMetrics, _metricsRegistry);
     } else {
       LOGGER.info("Using SingleConnectionBrokerRequestHandler");
       return new SingleConnectionBrokerRequestHandler(_config, _routingTable, _timeBoundaryService,
-          _accessControlFactory, _tableQueryQuotaManager, _brokerMetrics);
+          _accessControlFactory, _queryQuotaManager, _brokerMetrics);
     }
   }
 
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
index 1004f5a..81f7cfd 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
@@ -27,7 +27,7 @@ import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.participant.statemachine.StateModelInfo;
 import org.apache.helix.participant.statemachine.Transition;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
+import org.apache.pinot.broker.queryquota.HelixBasedQueryQuotaManager;
 import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
 import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.config.TableConfig;
@@ -50,15 +50,15 @@ public class BrokerResourceOnlineOfflineStateModelFactory extends StateModelFact
   private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   private final HelixDataAccessor _helixDataAccessor;
   private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
-  private final TableQueryQuotaManager _tableQueryQuotaManager;
+  private final HelixBasedQueryQuotaManager _helixBasedQueryQuotaManager;
 
   public BrokerResourceOnlineOfflineStateModelFactory(ZkHelixPropertyStore<ZNRecord> propertyStore,
       HelixDataAccessor helixDataAccessor, HelixExternalViewBasedRouting helixExternalViewBasedRouting,
-      TableQueryQuotaManager tableQueryQuotaManager) {
+      HelixBasedQueryQuotaManager helixBasedQueryQuotaManager) {
     _helixDataAccessor = helixDataAccessor;
     _propertyStore = propertyStore;
     _helixExternalViewBasedRouting = helixExternalViewBasedRouting;
-    _tableQueryQuotaManager = tableQueryQuotaManager;
+    _helixBasedQueryQuotaManager = helixBasedQueryQuotaManager;
   }
 
   public static String getStateModelDef() {
@@ -83,7 +83,7 @@ public class BrokerResourceOnlineOfflineStateModelFactory extends StateModelFact
         _helixExternalViewBasedRouting.markDataResourceOnline(tableConfig,
             _helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(tableName)),
             _helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs()));
-        _tableQueryQuotaManager.initTableQueryQuota(tableConfig,
+        _helixBasedQueryQuotaManager.initTableQueryQuota(tableConfig,
             _helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(BROKER_RESOURCE_INSTANCE)));
       } catch (Exception e) {
         LOGGER.error("Caught exception during OFFLINE -> ONLINE transition", e);
@@ -98,7 +98,7 @@ public class BrokerResourceOnlineOfflineStateModelFactory extends StateModelFact
         LOGGER.info("BrokerResourceOnlineOfflineStateModel.onBecomeOfflineFromOnline() : " + message);
         String tableName = message.getPartitionName();
         _helixExternalViewBasedRouting.markDataResourceOffline(tableName);
-        _tableQueryQuotaManager.dropTableQueryQuota(tableName);
+        _helixBasedQueryQuotaManager.dropTableQueryQuota(tableName);
       } catch (Exception e) {
         LOGGER.error("Caught exception during ONLINE -> OFFLINE transition", e);
         Utils.rethrowException(e);
@@ -112,7 +112,7 @@ public class BrokerResourceOnlineOfflineStateModelFactory extends StateModelFact
         LOGGER.info("BrokerResourceOnlineOfflineStateModel.onBecomeDroppedFromOffline() : " + message);
         String tableName = message.getPartitionName();
         _helixExternalViewBasedRouting.markDataResourceOffline(tableName);
-        _tableQueryQuotaManager.dropTableQueryQuota(tableName);
+        _helixBasedQueryQuotaManager.dropTableQueryQuota(tableName);
       } catch (Exception e) {
         LOGGER.error("Caught exception during OFFLINE -> DROPPED transition", e);
         Utils.rethrowException(e);
@@ -126,7 +126,7 @@ public class BrokerResourceOnlineOfflineStateModelFactory extends StateModelFact
         LOGGER.info("BrokerResourceOnlineOfflineStateModel.onBecomeDroppedFromOnline() : " + message);
         String tableName = message.getPartitionName();
         _helixExternalViewBasedRouting.markDataResourceOffline(tableName);
-        _tableQueryQuotaManager.dropTableQueryQuota(tableName);
+        _helixBasedQueryQuotaManager.dropTableQueryQuota(tableName);
       } catch (Exception e) {
         LOGGER.error("Caught exception during ONLINE -> DROPPED transition", e);
         Utils.rethrowException(e);
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
index d684d09..80b83a6 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
@@ -42,8 +42,11 @@ import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.broker.broker.AccessControlFactory;
 import org.apache.pinot.broker.broker.BrokerServerBuilder;
-import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
+import org.apache.pinot.broker.queryquota.HelixBasedQueryQuotaManager;
+import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
+import org.apache.pinot.broker.requesthandler.ConnectionPoolBrokerRequestHandler;
 import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
+import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.config.TagNameUtils;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metrics.BrokerMeter;
@@ -58,7 +61,6 @@ import org.slf4j.LoggerFactory;
 @SuppressWarnings("unused")
 public class HelixBrokerStarter {
   private static final Logger LOGGER = LoggerFactory.getLogger(HelixBrokerStarter.class);
-  private static final String ROUTING_TABLE_PARAMS_SUBSET_KEY = "pinot.broker.routing.table";
 
   private final Configuration _brokerConf;
   private final String _clusterName;
@@ -77,13 +79,11 @@ public class HelixBrokerStarter {
 
   // Cluster change handlers
   private HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
-  private TableQueryQuotaManager _tableQueryQuotaManager;
+  private HelixBasedQueryQuotaManager _helixBasedQueryQuotaManager;
   private LiveInstanceChangeHandler _liveInstanceChangeHandler;
   private ClusterChangeMediator _clusterChangeMediator;
 
   private BrokerServerBuilder _brokerServerBuilder;
-  private AccessControlFactory _accessControlFactory;
-  private MetricsRegistry _metricsRegistry;
 
   // Participant Helix manager handles Helix functionality such as state transitions and messages
   private HelixManager _participantHelixManager;
@@ -149,10 +149,10 @@ public class HelixBrokerStarter {
     _liveInstanceChangeHandlers.add(liveInstanceChangeHandler);
   }
 
-  // TODO: refactor this logic into BrokerServerBuilder
   public void start()
       throws Exception {
     LOGGER.info("Starting Pinot broker");
+    Utils.logVersions();
 
     // Connect the spectator Helix manager
     LOGGER.info("Connecting spectator Helix manager");
@@ -163,22 +163,24 @@ public class HelixBrokerStarter {
     _propertyStore = _spectatorHelixManager.getHelixPropertyStore();
     _helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor();
     _helixExternalViewBasedRouting =
-        new HelixExternalViewBasedRouting(_brokerConf.subset(ROUTING_TABLE_PARAMS_SUBSET_KEY));
+        new HelixExternalViewBasedRouting(_brokerConf.subset(CommonConstants.Broker.ROUTING_TABLE_CONFIG_PREFIX));
     _helixExternalViewBasedRouting.init(_spectatorHelixManager);
-    _tableQueryQuotaManager = new TableQueryQuotaManager();
-    _tableQueryQuotaManager.init(_spectatorHelixManager);
+    _helixBasedQueryQuotaManager = new HelixBasedQueryQuotaManager();
+    _helixBasedQueryQuotaManager.init(_spectatorHelixManager);
     _liveInstanceChangeHandler = new LiveInstanceChangeHandler();
     _liveInstanceChangeHandler.init(_spectatorHelixManager);
 
     // Set up the broker server builder
     LOGGER.info("Setting up broker server builder");
     _brokerServerBuilder = new BrokerServerBuilder(_brokerConf, _helixExternalViewBasedRouting,
-        _helixExternalViewBasedRouting.getTimeBoundaryService(), _liveInstanceChangeHandler, _tableQueryQuotaManager);
-    _accessControlFactory = _brokerServerBuilder.getAccessControlFactory();
-    _metricsRegistry = _brokerServerBuilder.getMetricsRegistry();
+        _helixExternalViewBasedRouting.getTimeBoundaryService(), _helixBasedQueryQuotaManager);
+    BrokerRequestHandler brokerRequestHandler = _brokerServerBuilder.getBrokerRequestHandler();
+    if (brokerRequestHandler instanceof ConnectionPoolBrokerRequestHandler) {
+      _liveInstanceChangeHandler.init(((ConnectionPoolBrokerRequestHandler) brokerRequestHandler).getConnPool());
+    }
     BrokerMetrics brokerMetrics = _brokerServerBuilder.getBrokerMetrics();
     _helixExternalViewBasedRouting.setBrokerMetrics(brokerMetrics);
-    _tableQueryQuotaManager.setBrokerMetrics(brokerMetrics);
+    _helixBasedQueryQuotaManager.setBrokerMetrics(brokerMetrics);
     _brokerServerBuilder.start();
 
     // Initialize the cluster change mediator
@@ -187,7 +189,7 @@ public class HelixBrokerStarter {
       externalViewChangeHandler.init(_spectatorHelixManager);
     }
     _externalViewChangeHandlers.add(_helixExternalViewBasedRouting);
-    _externalViewChangeHandlers.add(_tableQueryQuotaManager);
+    _externalViewChangeHandlers.add(_helixBasedQueryQuotaManager);
     for (ClusterChangeHandler instanceConfigChangeHandler : _instanceConfigChangeHandlers) {
       instanceConfigChangeHandler.init(_spectatorHelixManager);
     }
@@ -213,7 +215,7 @@ public class HelixBrokerStarter {
     StateMachineEngine stateMachineEngine = _participantHelixManager.getStateMachineEngine();
     StateModelFactory<?> stateModelFactory =
         new BrokerResourceOnlineOfflineStateModelFactory(_propertyStore, _helixDataAccessor,
-            _helixExternalViewBasedRouting, _tableQueryQuotaManager);
+            _helixExternalViewBasedRouting, _helixBasedQueryQuotaManager);
     stateMachineEngine
         .registerStateModelFactory(BrokerResourceOnlineOfflineStateModelFactory.getStateModelDef(), stateModelFactory);
     _participantHelixManager.connect();
@@ -288,7 +290,7 @@ public class HelixBrokerStarter {
   }
 
   public AccessControlFactory getAccessControlFactory() {
-    return _accessControlFactory;
+    return _brokerServerBuilder.getAccessControlFactory();
   }
 
   public HelixManager getSpectatorHelixManager() {
@@ -304,7 +306,7 @@ public class HelixBrokerStarter {
   }
 
   public MetricsRegistry getMetricsRegistry() {
-    return _metricsRegistry;
+    return _brokerServerBuilder.getMetricsRegistry();
   }
 
   public static HelixBrokerStarter getDefault()
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixBasedQueryQuotaManager.java
similarity index 97%
rename from pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManager.java
rename to pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixBasedQueryQuotaManager.java
index 1a685d2..af0f8ae 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManager.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixBasedQueryQuotaManager.java
@@ -46,8 +46,8 @@ import static org.apache.pinot.common.utils.CommonConstants.Helix.BROKER_RESOURC
 import static org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
 
 
-public class TableQueryQuotaManager implements ClusterChangeHandler {
-  private static final Logger LOGGER = LoggerFactory.getLogger(TableQueryQuotaManager.class);
+public class HelixBasedQueryQuotaManager implements ClusterChangeHandler, QueryQuotaManager {
+  private static final Logger LOGGER = LoggerFactory.getLogger(HelixBasedQueryQuotaManager.class);
   private static final int TIME_RANGE_IN_SECOND = 1;
 
   private final AtomicInteger _lastKnownBrokerResourceVersion = new AtomicInteger(-1);
@@ -58,7 +58,7 @@ public class TableQueryQuotaManager implements ClusterChangeHandler {
 
   @Override
   public void init(HelixManager helixManager) {
-    Preconditions.checkState(_helixManager == null, "TableQueryQuotaManager is already initialized");
+    Preconditions.checkState(_helixManager == null, "HelixBasedQueryQuotaManager is already initialized");
     _helixManager = helixManager;
   }
 
@@ -190,10 +190,12 @@ public class TableQueryQuotaManager implements ClusterChangeHandler {
   }
 
   /**
-   * Acquire a token from rate limiter based on the table name.
-   * @param tableName original table name which could be raw.
+   * {@inheritDoc}
+   * <p>Acquires a token from rate limiter based on the table name.
+   *
    * @return true if there is no query quota specified for the table or a token can be acquired, otherwise return false.
    */
+  @Override
   public boolean acquire(String tableName) {
     LOGGER.debug("Trying to acquire token for table: {}", tableName);
     String offlineTableName = null;
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java
new file mode 100644
index 0000000..6fd335e
--- /dev/null
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.queryquota;
+
+public interface QueryQuotaManager {
+
+  /**
+   * Try to acquire a quota for the given table.
+   * @param tableName Table name with or without type suffix
+   * @return {@code true} if the table quota has not been reached, {@code false} otherwise
+   */
+  boolean acquire(String tableName);
+}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 20d384e..6981280 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -36,7 +36,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.broker.api.RequestStatistics;
 import org.apache.pinot.broker.api.RequesterIdentity;
 import org.apache.pinot.broker.broker.AccessControlFactory;
-import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
+import org.apache.pinot.broker.queryquota.QueryQuotaManager;
 import org.apache.pinot.broker.routing.RoutingTable;
 import org.apache.pinot.broker.routing.RoutingTableLookupRequest;
 import org.apache.pinot.broker.routing.TimeBoundaryService;
@@ -72,7 +72,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
   protected final RoutingTable _routingTable;
   protected final TimeBoundaryService _timeBoundaryService;
   protected final AccessControlFactory _accessControlFactory;
-  protected final TableQueryQuotaManager _tableQueryQuotaManager;
+  protected final QueryQuotaManager _queryQuotaManager;
   protected final BrokerMetrics _brokerMetrics;
 
   protected final AtomicLong _requestIdGenerator = new AtomicLong();
@@ -90,12 +90,12 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
 
   public BaseBrokerRequestHandler(Configuration config, RoutingTable routingTable,
       TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
-      TableQueryQuotaManager tableQueryQuotaManager, BrokerMetrics brokerMetrics) {
+      QueryQuotaManager queryQuotaManager, BrokerMetrics brokerMetrics) {
     _config = config;
     _routingTable = routingTable;
     _timeBoundaryService = timeBoundaryService;
     _accessControlFactory = accessControlFactory;
-    _tableQueryQuotaManager = tableQueryQuotaManager;
+    _queryQuotaManager = queryQuotaManager;
     _brokerMetrics = brokerMetrics;
 
     _brokerId = config.getString(CONFIG_OF_BROKER_ID, getDefaultBrokerId());
@@ -108,9 +108,9 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     _numDroppedLog = new AtomicInteger(0);
     _numDroppedLogRateLimiter = RateLimiter.create(1.0);
 
-    LOGGER.info(
-        "Broker Id: {}, timeout: {}ms, query response limit: {}, query log length: {}, query log max rate: {}qps",
-        _brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogLength, _queryLogRateLimiter.getRate());
+    LOGGER
+        .info("Broker Id: {}, timeout: {}ms, query response limit: {}, query log length: {}, query log max rate: {}qps",
+            _brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogLength, _queryLogRateLimiter.getRate());
   }
 
   private String getDefaultBrokerId() {
@@ -200,7 +200,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     }
 
     // Validate QPS quota
-    if (!_tableQueryQuotaManager.acquire(tableName)) {
+    if (!_queryQuotaManager.acquire(tableName)) {
       String errorMessage =
           String.format("Request %d exceeds query quota for table:%s, query:%s", requestId, tableName, query);
       LOGGER.info(errorMessage);
@@ -305,7 +305,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
 
     LOGGER.debug("Broker Response: {}", brokerResponse);
 
-    if(_queryLogRateLimiter.tryAcquire() || forceLog(brokerResponse, totalTimeMs)) {
+    if (_queryLogRateLimiter.tryAcquire() || forceLog(brokerResponse, totalTimeMs)) {
       // Table name might have been changed (with suffix _OFFLINE/_REALTIME appended)
       LOGGER.info(
           "RequestId:{}, table:{}, timeMs:{}, docs:{}/{}, entries:{}/{}, segments(queried/processed/matched):{}/{}/{} "
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
index 0202315..60a4312 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
@@ -36,8 +36,7 @@ import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.configuration.Configuration;
 import org.apache.pinot.broker.api.RequestStatistics;
 import org.apache.pinot.broker.broker.AccessControlFactory;
-import org.apache.pinot.broker.broker.helix.LiveInstanceChangeHandler;
-import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
+import org.apache.pinot.broker.queryquota.QueryQuotaManager;
 import org.apache.pinot.broker.routing.RoutingTable;
 import org.apache.pinot.broker.routing.TimeBoundaryService;
 import org.apache.pinot.common.config.TableNameBuilder;
@@ -79,7 +78,6 @@ public class ConnectionPoolBrokerRequestHandler extends BaseBrokerRequestHandler
   private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionPoolBrokerRequestHandler.class);
   private static final String TRANSPORT_CONFIG_PREFIX = "pinot.broker.transport";
 
-  private final LiveInstanceChangeHandler _liveInstanceChangeHandler;
   private final EventLoopGroup _eventLoopGroup;
   private final ScheduledThreadPoolExecutor _poolTimeoutExecutor;
   private final ExecutorService _requestSenderPool;
@@ -88,10 +86,8 @@ public class ConnectionPoolBrokerRequestHandler extends BaseBrokerRequestHandler
 
   public ConnectionPoolBrokerRequestHandler(Configuration config, RoutingTable routingTable,
       TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
-      TableQueryQuotaManager tableQueryQuotaManager, BrokerMetrics brokerMetrics,
-      LiveInstanceChangeHandler liveInstanceChangeHandler, MetricsRegistry metricsRegistry) {
-    super(config, routingTable, timeBoundaryService, accessControlFactory, tableQueryQuotaManager, brokerMetrics);
-    _liveInstanceChangeHandler = liveInstanceChangeHandler;
+      QueryQuotaManager queryQuotaManager, BrokerMetrics brokerMetrics, MetricsRegistry metricsRegistry) {
+    super(config, routingTable, timeBoundaryService, accessControlFactory, queryQuotaManager, brokerMetrics);
 
     TransportClientConf transportClientConf = new TransportClientConf();
     transportClientConf.init(_config.subset(TRANSPORT_CONFIG_PREFIX));
@@ -116,10 +112,13 @@ public class ConnectionPoolBrokerRequestHandler extends BaseBrokerRequestHandler
     _scatterGather = new ScatterGatherImpl(_connPool, _requestSenderPool);
   }
 
+  public KeyedPool<PooledNettyClientResourceManager.PooledClientConnection> getConnPool() {
+    return _connPool;
+  }
+
   @Override
   public synchronized void start() {
     _connPool.start();
-    _liveInstanceChangeHandler.init(_connPool);
   }
 
   @Override
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index 4231e52..d7a4890 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -26,7 +26,7 @@ import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.configuration.Configuration;
 import org.apache.pinot.broker.api.RequestStatistics;
 import org.apache.pinot.broker.broker.AccessControlFactory;
-import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
+import org.apache.pinot.broker.queryquota.QueryQuotaManager;
 import org.apache.pinot.broker.routing.RoutingTable;
 import org.apache.pinot.broker.routing.TimeBoundaryService;
 import org.apache.pinot.common.config.TableNameBuilder;
@@ -55,8 +55,8 @@ public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandl
 
   public SingleConnectionBrokerRequestHandler(Configuration config, RoutingTable routingTable,
       TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
-      TableQueryQuotaManager tableQueryQuotaManager, BrokerMetrics brokerMetrics) {
-    super(config, routingTable, timeBoundaryService, accessControlFactory, tableQueryQuotaManager, brokerMetrics);
+      QueryQuotaManager queryQuotaManager, BrokerMetrics brokerMetrics) {
+    super(config, routingTable, timeBoundaryService, accessControlFactory, queryQuotaManager, brokerMetrics);
     _queryRouter = new QueryRouter(_brokerId, brokerMetrics);
   }
 
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManagerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixBasedQueryQuotaManagerTest.java
similarity index 79%
rename from pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManagerTest.java
rename to pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixBasedQueryQuotaManagerTest.java
index 3e5eae2..852748f 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManagerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixBasedQueryQuotaManagerTest.java
@@ -44,10 +44,10 @@ import static org.apache.pinot.common.utils.CommonConstants.Helix.BROKER_RESOURC
 import static org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
 
 
-public class TableQueryQuotaManagerTest {
+public class HelixBasedQueryQuotaManagerTest {
   private ZkHelixPropertyStore<ZNRecord> _testPropertyStore;
   private HelixManager _helixManager;
-  private TableQueryQuotaManager _tableQueryQuotaManager;
+  private HelixBasedQueryQuotaManager _queryQuotaManager;
   private ZkStarter.ZookeeperInstance _zookeeperInstance;
   private static String RAW_TABLE_NAME = "testTable";
   private static String OFFLINE_TABLE_NAME = RAW_TABLE_NAME + "_OFFLINE";
@@ -62,8 +62,8 @@ public class TableQueryQuotaManagerTest {
     _helixManager = initHelixManager(helixClusterName);
     _testPropertyStore = _helixManager.getHelixPropertyStore();
 
-    _tableQueryQuotaManager = new TableQueryQuotaManager();
-    _tableQueryQuotaManager.init(_helixManager);
+    _queryQuotaManager = new HelixBasedQueryQuotaManager();
+    _queryQuotaManager.init(_helixManager);
   }
 
   private HelixManager initHelixManager(String helixClusterName) {
@@ -101,7 +101,7 @@ public class TableQueryQuotaManagerTest {
       ZKMetadataProvider.removeResourceConfigFromPropertyStore(_testPropertyStore, OFFLINE_TABLE_NAME);
       ZKMetadataProvider.removeResourceConfigFromPropertyStore(_testPropertyStore, REALTIME_TABLE_NAME);
     }
-    _tableQueryQuotaManager.cleanUpRateLimiterMap();
+    _queryQuotaManager.cleanUpRateLimiterMap();
   }
 
   @AfterTest
@@ -118,14 +118,14 @@ public class TableQueryQuotaManagerTest {
     ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
     setQps(tableConfig);
-    _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 1);
+    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
 
     // All the request should be passed.
     runQueries(70, 10);
 
-    _tableQueryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+    _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
   }
 
   @Test
@@ -133,8 +133,8 @@ public class TableQueryQuotaManagerTest {
       throws Exception {
     ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
-    _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
   }
 
   @Test
@@ -151,12 +151,12 @@ public class TableQueryQuotaManagerTest {
 
     ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
-    _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
 
     // Nothing happened since it doesn't have qps quota.
-    _tableQueryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+    _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
   }
 
   @Test
@@ -174,12 +174,12 @@ public class TableQueryQuotaManagerTest {
 
     ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
-    _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
 
     // Drop the offline table won't have any affect since it is table type specific.
-    _tableQueryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+    _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
   }
 
   @Test
@@ -210,22 +210,22 @@ public class TableQueryQuotaManagerTest {
     ZKMetadataProvider.setOfflineTableConfig(_testPropertyStore, OFFLINE_TABLE_NAME, offlineTableConfig.toZNRecord());
 
     // Since each table has 2 online brokers, per broker rate becomes 100.0 / 2 = 50.0
-    _tableQueryQuotaManager.initTableQueryQuota(offlineTableConfig, brokerResource);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 1);
-    _tableQueryQuotaManager.initTableQueryQuota(realtimeTableConfig, brokerResource);
+    _queryQuotaManager.initTableQueryQuota(offlineTableConfig, brokerResource);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+    _queryQuotaManager.initTableQueryQuota(realtimeTableConfig, brokerResource);
     // The hash map now contains 2 entries for both of the tables.
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 2);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 2);
 
     // Rate limiter generates 1 token every 10 milliseconds, have to make it sleep for a while.
     runQueries(70, 10L);
 
-    _tableQueryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
+    _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
     // Since real-time table still has the qps quota, the size of the hash map becomes 1.
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 1);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
 
-    _tableQueryQuotaManager.dropTableQueryQuota(REALTIME_TABLE_NAME);
+    _queryQuotaManager.dropTableQueryQuota(REALTIME_TABLE_NAME);
     // Since the only 1 table which has qps quota has been dropped, the size of the hash map becomes 0.
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
   }
 
   @Test
@@ -234,13 +234,13 @@ public class TableQueryQuotaManagerTest {
     ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME);
     setQps(tableConfig);
-    _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 1);
+    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
 
     runQueries(70, 10L);
 
-    _tableQueryQuotaManager.dropTableQueryQuota(REALTIME_TABLE_NAME);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+    _queryQuotaManager.dropTableQueryQuota(REALTIME_TABLE_NAME);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
   }
 
   @Test
@@ -248,8 +248,8 @@ public class TableQueryQuotaManagerTest {
       throws Exception {
     ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME);
-    _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
   }
 
   @Test
@@ -265,8 +265,8 @@ public class TableQueryQuotaManagerTest {
 
     ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME);
-    _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
   }
 
   @Test
@@ -283,8 +283,8 @@ public class TableQueryQuotaManagerTest {
 
     ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME);
-    _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
   }
 
   @Test
@@ -296,8 +296,8 @@ public class TableQueryQuotaManagerTest {
     QuotaConfig quotaConfig = new QuotaConfig();
     quotaConfig.setMaxQueriesPerSecond("InvalidQpsQuota");
     tableConfig.setQuotaConfig(quotaConfig);
-    _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
   }
 
   @Test
@@ -309,8 +309,8 @@ public class TableQueryQuotaManagerTest {
     QuotaConfig quotaConfig = new QuotaConfig();
     quotaConfig.setMaxQueriesPerSecond("-1.0");
     tableConfig.setQuotaConfig(quotaConfig);
-    _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
   }
 
   @Test
@@ -318,8 +318,8 @@ public class TableQueryQuotaManagerTest {
       throws Exception {
     TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
     setQps(tableConfig);
-    _tableQueryQuotaManager.initTableQueryQuota(tableConfig, null);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+    _queryQuotaManager.initTableQueryQuota(tableConfig, null);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
   }
 
   @Test
@@ -328,8 +328,8 @@ public class TableQueryQuotaManagerTest {
     ExternalView brokerResource = new ExternalView(BROKER_RESOURCE_INSTANCE);
     TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
     setQps(tableConfig);
-    _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 1);
+    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
   }
 
   @Test
@@ -339,11 +339,11 @@ public class TableQueryQuotaManagerTest {
     brokerResource.setState(OFFLINE_TABLE_NAME, "broker_instance_2", "OFFLINE");
     TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
     setQps(tableConfig);
-    _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
 
     // For the 1st version we don't check the number of online brokers.
     // Thus the expected size now is 1. It'll be 0 when we bring dynamic rate back.
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 1);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
   }
 
   private TableConfig generateDefaultTableConfig(String tableName) {
@@ -370,7 +370,7 @@ public class TableQueryQuotaManagerTest {
       throws InterruptedException {
     int count = 0;
     for (int i = 0; i < numOfTimesToRun; i++) {
-      Assert.assertTrue(_tableQueryQuotaManager.acquire(RAW_TABLE_NAME));
+      Assert.assertTrue(_queryQuotaManager.acquire(RAW_TABLE_NAME));
       count++;
       Thread.sleep(millis);
     }
@@ -380,7 +380,7 @@ public class TableQueryQuotaManagerTest {
     count = 0;
     millis /= 2;
     for (int i = 0; i < numOfTimesToRun; i++) {
-      if (!_tableQueryQuotaManager.acquire(RAW_TABLE_NAME)) {
+      if (!_queryQuotaManager.acquire(RAW_TABLE_NAME)) {
         count++;
       }
       Thread.sleep(millis);
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 8ed7999..fee6962 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -111,6 +111,18 @@ public class CommonConstants {
   }
 
   public static class Broker {
+    public static final String ROUTING_TABLE_CONFIG_PREFIX = "pinot.broker.routing.table";
+    public static final String ACCESS_CONTROL_CONFIG_PREFIX = "pinot.broker.access.control";
+    public static final String METRICS_CONFIG_PREFIX = "pinot.broker.metrics";
+
+    public static final String CONFIG_OF_DELAY_SHUTDOWN_TIME_MS = "pinot.broker.delayShutdownTimeMs";
+    public static final long DEFAULT_DELAY_SHUTDOWN_TIME_MS = 10_000L;
+    public static final String CONFIG_OF_ENABLE_TABLE_LEVEL_METRICS = "pinot.broker.enableTableLevelMetrics";
+    public static final String CONFIG_OF_REQUEST_HANDLER_TYPE = "pinot.broker.requestHandlerType";
+    public static final String SINGLE_CONNECTION_REQUEST_HANDLER_TYPE = "singleConnection";
+    public static final String CONNECTION_POOL_REQUEST_HANDLER_TYPE = "connectionPool";
+    public static final String DEFAULT_REQUEST_HANDLER_TYPE = SINGLE_CONNECTION_REQUEST_HANDLER_TYPE;
+
     public static final String CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT = "pinot.broker.query.response.limit";
     public static final int DEFAULT_BROKER_QUERY_RESPONSE_LIMIT = Integer.MAX_VALUE;
     public static final String CONFIG_OF_BROKER_QUERY_LOG_LENGTH = "pinot.broker.query.log.length";
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index d671311..f0580a0 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -44,7 +44,6 @@ import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.io.FileUtils;
 import org.apache.http.HttpStatus;
-import org.apache.pinot.broker.broker.BrokerServerBuilder;
 import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
 import org.apache.pinot.common.config.IndexingConfig;
 import org.apache.pinot.common.config.TableConfig;
@@ -120,11 +119,12 @@ public abstract class ClusterTest extends ControllerTest {
       Configuration brokerConf = new BaseConfiguration();
       brokerConf.setProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 60 * 1000L);
       brokerConf.setProperty(Helix.KEY_OF_BROKER_QUERY_PORT, Integer.toString(basePort + i));
-      brokerConf.setProperty(BrokerServerBuilder.DELAY_SHUTDOWN_TIME_MS_CONFIG, 0);
+      brokerConf.setProperty(Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, 0);
       // Randomly choose to use connection-pool or single-connection request handler
       if (RANDOM.nextBoolean()) {
-        brokerConf.setProperty(BrokerServerBuilder.REQUEST_HANDLER_TYPE_CONFIG,
-            BrokerServerBuilder.SINGLE_CONNECTION_REQUEST_HANDLER_TYPE);
+        brokerConf.setProperty(Broker.CONFIG_OF_REQUEST_HANDLER_TYPE, Broker.CONNECTION_POOL_REQUEST_HANDLER_TYPE);
+      } else {
+        brokerConf.setProperty(Broker.CONFIG_OF_REQUEST_HANDLER_TYPE, Broker.SINGLE_CONNECTION_REQUEST_HANDLER_TYPE);
       }
       overrideBrokerConf(brokerConf);
       HelixBrokerStarter brokerStarter = new HelixBrokerStarter(brokerConf, _clusterName, zkStr);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org