You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/04/19 04:57:07 UTC

[incubator-pinot] branch decouple_broker created (now 3594a32)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a change to branch decouple_broker
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 3594a32  Decouple BrokerServerBuilder from Helix

This branch includes the following new commits:

     new 3594a32  Decouple BrokerServerBuilder from Helix

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Decouple BrokerServerBuilder from Helix

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch decouple_broker
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 3594a32faad7401ba87fe4bbbe5f3b09e54d4b4f
Author: Jackie (Xiaotian) Jiang <xa...@linkedin.com>
AuthorDate: Thu Apr 18 21:54:09 2019 -0700

    Decouple BrokerServerBuilder from Helix
    
    BrokerServerBuilder should be independent of Helix, and only
    performs broker functionalities.
    
    - Add QueryQuotaManager interface for quota management
    - Remove LiveInstanceChangeListener from BrokerServerBuilder
      constructor
    - Move constants into CommonConstants
---
 .../pinot/broker/broker/BrokerServerBuilder.java   | 39 ++++-----
 ...okerResourceOnlineOfflineStateModelFactory.java | 16 ++--
 .../broker/broker/helix/HelixBrokerStarter.java    | 36 ++++----
 ...nager.java => HelixBasedQueryQuotaManager.java} | 12 +--
 .../pinot/broker/queryquota/QueryQuotaManager.java | 29 +++++++
 .../requesthandler/BaseBrokerRequestHandler.java   | 18 ++--
 .../ConnectionPoolBrokerRequestHandler.java        | 15 ++--
 .../SingleConnectionBrokerRequestHandler.java      |  6 +-
 ...t.java => HelixBasedQueryQuotaManagerTest.java} | 98 +++++++++++-----------
 .../apache/pinot/common/utils/CommonConstants.java | 12 +++
 .../pinot/integration/tests/ClusterTest.java       |  8 +-
 11 files changed, 161 insertions(+), 128 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
index a63e954..1880cc5 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
@@ -22,8 +22,7 @@ import com.google.common.base.Preconditions;
 import com.yammer.metrics.core.MetricsRegistry;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.configuration.Configuration;
-import org.apache.pinot.broker.broker.helix.LiveInstanceChangeHandler;
-import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
+import org.apache.pinot.broker.queryquota.QueryQuotaManager;
 import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
 import org.apache.pinot.broker.requesthandler.ConnectionPoolBrokerRequestHandler;
 import org.apache.pinot.broker.requesthandler.SingleConnectionBrokerRequestHandler;
@@ -36,33 +35,24 @@ import org.apache.pinot.common.utils.CommonConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.common.utils.CommonConstants.Broker.*;
+
 
 public class BrokerServerBuilder {
   private static final Logger LOGGER = LoggerFactory.getLogger(BrokerServerBuilder.class);
 
-  public static final String DELAY_SHUTDOWN_TIME_MS_CONFIG = "pinot.broker.delayShutdownTimeMs";
-  public static final long DEFAULT_DELAY_SHUTDOWN_TIME_MS = 10_000;
-  public static final String ACCESS_CONTROL_PREFIX = "pinot.broker.access.control";
-  public static final String METRICS_CONFIG_PREFIX = "pinot.broker.metrics";
-  public static final String TABLE_LEVEL_METRICS_CONFIG = "pinot.broker.enableTableLevelMetrics";
-  public static final String REQUEST_HANDLER_TYPE_CONFIG = "pinot.broker.requestHandlerType";
-  public static final String SINGLE_CONNECTION_REQUEST_HANDLER_TYPE = "singleConnection";
-  public static final String CONNECTION_POOL_REQUEST_HANDLER_TYPE = "connectionPool";
-  public static final String DEFAULT_REQUEST_HANDLER_TYPE = SINGLE_CONNECTION_REQUEST_HANDLER_TYPE;
-
   public enum State {
     INIT, STARTING, RUNNING, SHUTTING_DOWN, SHUTDOWN
   }
 
   // Running State Of broker
-  private final AtomicReference<State> _state = new AtomicReference<>();
+  private final AtomicReference<State> _state = new AtomicReference<>(State.INIT);
 
   private final Configuration _config;
   private final long _delayedShutdownTimeMs;
   private final RoutingTable _routingTable;
   private final TimeBoundaryService _timeBoundaryService;
-  private final LiveInstanceChangeHandler _liveInstanceChangeHandler;
-  private final TableQueryQuotaManager _tableQueryQuotaManager;
+  private final QueryQuotaManager _queryQuotaManager;
   private final AccessControlFactory _accessControlFactory;
   private final MetricsRegistry _metricsRegistry;
   private final BrokerMetrics _brokerMetrics;
@@ -70,34 +60,33 @@ public class BrokerServerBuilder {
   private final BrokerAdminApiApplication _brokerAdminApplication;
 
   public BrokerServerBuilder(Configuration config, RoutingTable routingTable, TimeBoundaryService timeBoundaryService,
-      LiveInstanceChangeHandler liveInstanceChangeHandler, TableQueryQuotaManager tableQueryQuotaManager) {
-    _state.set(State.INIT);
+      QueryQuotaManager queryQuotaManager) {
     _config = config;
-    _delayedShutdownTimeMs = config.getLong(DELAY_SHUTDOWN_TIME_MS_CONFIG, DEFAULT_DELAY_SHUTDOWN_TIME_MS);
+    _delayedShutdownTimeMs = config.getLong(CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, DEFAULT_DELAY_SHUTDOWN_TIME_MS);
     _routingTable = routingTable;
     _timeBoundaryService = timeBoundaryService;
-    _liveInstanceChangeHandler = liveInstanceChangeHandler;
-    _tableQueryQuotaManager = tableQueryQuotaManager;
-    _accessControlFactory = AccessControlFactory.loadFactory(_config.subset(ACCESS_CONTROL_PREFIX));
+    _queryQuotaManager = queryQuotaManager;
+    _accessControlFactory = AccessControlFactory.loadFactory(_config.subset(ACCESS_CONTROL_CONFIG_PREFIX));
     _metricsRegistry = new MetricsRegistry();
     MetricsHelper.initializeMetrics(config.subset(METRICS_CONFIG_PREFIX));
     MetricsHelper.registerMetricsRegistry(_metricsRegistry);
-    _brokerMetrics = new BrokerMetrics(_metricsRegistry, !_config.getBoolean(TABLE_LEVEL_METRICS_CONFIG, true));
+    _brokerMetrics =
+        new BrokerMetrics(_metricsRegistry, !_config.getBoolean(CONFIG_OF_ENABLE_TABLE_LEVEL_METRICS, true));
     _brokerMetrics.initializeGlobalMeters();
     _brokerRequestHandler = buildRequestHandler();
     _brokerAdminApplication = new BrokerAdminApiApplication(this);
   }
 
   private BrokerRequestHandler buildRequestHandler() {
-    String requestHandlerType = _config.getString(REQUEST_HANDLER_TYPE_CONFIG, DEFAULT_REQUEST_HANDLER_TYPE);
+    String requestHandlerType = _config.getString(CONFIG_OF_REQUEST_HANDLER_TYPE, DEFAULT_REQUEST_HANDLER_TYPE);
     if (requestHandlerType.equalsIgnoreCase(CONNECTION_POOL_REQUEST_HANDLER_TYPE)) {
       LOGGER.info("Using ConnectionPoolBrokerRequestHandler");
       return new ConnectionPoolBrokerRequestHandler(_config, _routingTable, _timeBoundaryService, _accessControlFactory,
-          _tableQueryQuotaManager, _brokerMetrics, _liveInstanceChangeHandler, _metricsRegistry);
+          _queryQuotaManager, _brokerMetrics, _metricsRegistry);
     } else {
       LOGGER.info("Using SingleConnectionBrokerRequestHandler");
       return new SingleConnectionBrokerRequestHandler(_config, _routingTable, _timeBoundaryService,
-          _accessControlFactory, _tableQueryQuotaManager, _brokerMetrics);
+          _accessControlFactory, _queryQuotaManager, _brokerMetrics);
     }
   }
 
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
index 1004f5a..81f7cfd 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
@@ -27,7 +27,7 @@ import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.participant.statemachine.StateModelInfo;
 import org.apache.helix.participant.statemachine.Transition;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
+import org.apache.pinot.broker.queryquota.HelixBasedQueryQuotaManager;
 import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
 import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.config.TableConfig;
@@ -50,15 +50,15 @@ public class BrokerResourceOnlineOfflineStateModelFactory extends StateModelFact
   private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   private final HelixDataAccessor _helixDataAccessor;
   private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
-  private final TableQueryQuotaManager _tableQueryQuotaManager;
+  private final HelixBasedQueryQuotaManager _helixBasedQueryQuotaManager;
 
   public BrokerResourceOnlineOfflineStateModelFactory(ZkHelixPropertyStore<ZNRecord> propertyStore,
       HelixDataAccessor helixDataAccessor, HelixExternalViewBasedRouting helixExternalViewBasedRouting,
-      TableQueryQuotaManager tableQueryQuotaManager) {
+      HelixBasedQueryQuotaManager helixBasedQueryQuotaManager) {
     _helixDataAccessor = helixDataAccessor;
     _propertyStore = propertyStore;
     _helixExternalViewBasedRouting = helixExternalViewBasedRouting;
-    _tableQueryQuotaManager = tableQueryQuotaManager;
+    _helixBasedQueryQuotaManager = helixBasedQueryQuotaManager;
   }
 
   public static String getStateModelDef() {
@@ -83,7 +83,7 @@ public class BrokerResourceOnlineOfflineStateModelFactory extends StateModelFact
         _helixExternalViewBasedRouting.markDataResourceOnline(tableConfig,
             _helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(tableName)),
             _helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs()));
-        _tableQueryQuotaManager.initTableQueryQuota(tableConfig,
+        _helixBasedQueryQuotaManager.initTableQueryQuota(tableConfig,
             _helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(BROKER_RESOURCE_INSTANCE)));
       } catch (Exception e) {
         LOGGER.error("Caught exception during OFFLINE -> ONLINE transition", e);
@@ -98,7 +98,7 @@ public class BrokerResourceOnlineOfflineStateModelFactory extends StateModelFact
         LOGGER.info("BrokerResourceOnlineOfflineStateModel.onBecomeOfflineFromOnline() : " + message);
         String tableName = message.getPartitionName();
         _helixExternalViewBasedRouting.markDataResourceOffline(tableName);
-        _tableQueryQuotaManager.dropTableQueryQuota(tableName);
+        _helixBasedQueryQuotaManager.dropTableQueryQuota(tableName);
       } catch (Exception e) {
         LOGGER.error("Caught exception during ONLINE -> OFFLINE transition", e);
         Utils.rethrowException(e);
@@ -112,7 +112,7 @@ public class BrokerResourceOnlineOfflineStateModelFactory extends StateModelFact
         LOGGER.info("BrokerResourceOnlineOfflineStateModel.onBecomeDroppedFromOffline() : " + message);
         String tableName = message.getPartitionName();
         _helixExternalViewBasedRouting.markDataResourceOffline(tableName);
-        _tableQueryQuotaManager.dropTableQueryQuota(tableName);
+        _helixBasedQueryQuotaManager.dropTableQueryQuota(tableName);
       } catch (Exception e) {
         LOGGER.error("Caught exception during OFFLINE -> DROPPED transition", e);
         Utils.rethrowException(e);
@@ -126,7 +126,7 @@ public class BrokerResourceOnlineOfflineStateModelFactory extends StateModelFact
         LOGGER.info("BrokerResourceOnlineOfflineStateModel.onBecomeDroppedFromOnline() : " + message);
         String tableName = message.getPartitionName();
         _helixExternalViewBasedRouting.markDataResourceOffline(tableName);
-        _tableQueryQuotaManager.dropTableQueryQuota(tableName);
+        _helixBasedQueryQuotaManager.dropTableQueryQuota(tableName);
       } catch (Exception e) {
         LOGGER.error("Caught exception during ONLINE -> DROPPED transition", e);
         Utils.rethrowException(e);
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
index d684d09..80b83a6 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
@@ -42,8 +42,11 @@ import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.broker.broker.AccessControlFactory;
 import org.apache.pinot.broker.broker.BrokerServerBuilder;
-import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
+import org.apache.pinot.broker.queryquota.HelixBasedQueryQuotaManager;
+import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
+import org.apache.pinot.broker.requesthandler.ConnectionPoolBrokerRequestHandler;
 import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
+import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.config.TagNameUtils;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metrics.BrokerMeter;
@@ -58,7 +61,6 @@ import org.slf4j.LoggerFactory;
 @SuppressWarnings("unused")
 public class HelixBrokerStarter {
   private static final Logger LOGGER = LoggerFactory.getLogger(HelixBrokerStarter.class);
-  private static final String ROUTING_TABLE_PARAMS_SUBSET_KEY = "pinot.broker.routing.table";
 
   private final Configuration _brokerConf;
   private final String _clusterName;
@@ -77,13 +79,11 @@ public class HelixBrokerStarter {
 
   // Cluster change handlers
   private HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
-  private TableQueryQuotaManager _tableQueryQuotaManager;
+  private HelixBasedQueryQuotaManager _helixBasedQueryQuotaManager;
   private LiveInstanceChangeHandler _liveInstanceChangeHandler;
   private ClusterChangeMediator _clusterChangeMediator;
 
   private BrokerServerBuilder _brokerServerBuilder;
-  private AccessControlFactory _accessControlFactory;
-  private MetricsRegistry _metricsRegistry;
 
   // Participant Helix manager handles Helix functionality such as state transitions and messages
   private HelixManager _participantHelixManager;
@@ -149,10 +149,10 @@ public class HelixBrokerStarter {
     _liveInstanceChangeHandlers.add(liveInstanceChangeHandler);
   }
 
-  // TODO: refactor this logic into BrokerServerBuilder
   public void start()
       throws Exception {
     LOGGER.info("Starting Pinot broker");
+    Utils.logVersions();
 
     // Connect the spectator Helix manager
     LOGGER.info("Connecting spectator Helix manager");
@@ -163,22 +163,24 @@ public class HelixBrokerStarter {
     _propertyStore = _spectatorHelixManager.getHelixPropertyStore();
     _helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor();
     _helixExternalViewBasedRouting =
-        new HelixExternalViewBasedRouting(_brokerConf.subset(ROUTING_TABLE_PARAMS_SUBSET_KEY));
+        new HelixExternalViewBasedRouting(_brokerConf.subset(CommonConstants.Broker.ROUTING_TABLE_CONFIG_PREFIX));
     _helixExternalViewBasedRouting.init(_spectatorHelixManager);
-    _tableQueryQuotaManager = new TableQueryQuotaManager();
-    _tableQueryQuotaManager.init(_spectatorHelixManager);
+    _helixBasedQueryQuotaManager = new HelixBasedQueryQuotaManager();
+    _helixBasedQueryQuotaManager.init(_spectatorHelixManager);
     _liveInstanceChangeHandler = new LiveInstanceChangeHandler();
     _liveInstanceChangeHandler.init(_spectatorHelixManager);
 
     // Set up the broker server builder
     LOGGER.info("Setting up broker server builder");
     _brokerServerBuilder = new BrokerServerBuilder(_brokerConf, _helixExternalViewBasedRouting,
-        _helixExternalViewBasedRouting.getTimeBoundaryService(), _liveInstanceChangeHandler, _tableQueryQuotaManager);
-    _accessControlFactory = _brokerServerBuilder.getAccessControlFactory();
-    _metricsRegistry = _brokerServerBuilder.getMetricsRegistry();
+        _helixExternalViewBasedRouting.getTimeBoundaryService(), _helixBasedQueryQuotaManager);
+    BrokerRequestHandler brokerRequestHandler = _brokerServerBuilder.getBrokerRequestHandler();
+    if (brokerRequestHandler instanceof ConnectionPoolBrokerRequestHandler) {
+      _liveInstanceChangeHandler.init(((ConnectionPoolBrokerRequestHandler) brokerRequestHandler).getConnPool());
+    }
     BrokerMetrics brokerMetrics = _brokerServerBuilder.getBrokerMetrics();
     _helixExternalViewBasedRouting.setBrokerMetrics(brokerMetrics);
-    _tableQueryQuotaManager.setBrokerMetrics(brokerMetrics);
+    _helixBasedQueryQuotaManager.setBrokerMetrics(brokerMetrics);
     _brokerServerBuilder.start();
 
     // Initialize the cluster change mediator
@@ -187,7 +189,7 @@ public class HelixBrokerStarter {
       externalViewChangeHandler.init(_spectatorHelixManager);
     }
     _externalViewChangeHandlers.add(_helixExternalViewBasedRouting);
-    _externalViewChangeHandlers.add(_tableQueryQuotaManager);
+    _externalViewChangeHandlers.add(_helixBasedQueryQuotaManager);
     for (ClusterChangeHandler instanceConfigChangeHandler : _instanceConfigChangeHandlers) {
       instanceConfigChangeHandler.init(_spectatorHelixManager);
     }
@@ -213,7 +215,7 @@ public class HelixBrokerStarter {
     StateMachineEngine stateMachineEngine = _participantHelixManager.getStateMachineEngine();
     StateModelFactory<?> stateModelFactory =
         new BrokerResourceOnlineOfflineStateModelFactory(_propertyStore, _helixDataAccessor,
-            _helixExternalViewBasedRouting, _tableQueryQuotaManager);
+            _helixExternalViewBasedRouting, _helixBasedQueryQuotaManager);
     stateMachineEngine
         .registerStateModelFactory(BrokerResourceOnlineOfflineStateModelFactory.getStateModelDef(), stateModelFactory);
     _participantHelixManager.connect();
@@ -288,7 +290,7 @@ public class HelixBrokerStarter {
   }
 
   public AccessControlFactory getAccessControlFactory() {
-    return _accessControlFactory;
+    return _brokerServerBuilder.getAccessControlFactory();
   }
 
   public HelixManager getSpectatorHelixManager() {
@@ -304,7 +306,7 @@ public class HelixBrokerStarter {
   }
 
   public MetricsRegistry getMetricsRegistry() {
-    return _metricsRegistry;
+    return _brokerServerBuilder.getMetricsRegistry();
   }
 
   public static HelixBrokerStarter getDefault()
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixBasedQueryQuotaManager.java
similarity index 97%
rename from pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManager.java
rename to pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixBasedQueryQuotaManager.java
index 1a685d2..af0f8ae 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManager.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixBasedQueryQuotaManager.java
@@ -46,8 +46,8 @@ import static org.apache.pinot.common.utils.CommonConstants.Helix.BROKER_RESOURC
 import static org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
 
 
-public class TableQueryQuotaManager implements ClusterChangeHandler {
-  private static final Logger LOGGER = LoggerFactory.getLogger(TableQueryQuotaManager.class);
+public class HelixBasedQueryQuotaManager implements ClusterChangeHandler, QueryQuotaManager {
+  private static final Logger LOGGER = LoggerFactory.getLogger(HelixBasedQueryQuotaManager.class);
   private static final int TIME_RANGE_IN_SECOND = 1;
 
   private final AtomicInteger _lastKnownBrokerResourceVersion = new AtomicInteger(-1);
@@ -58,7 +58,7 @@ public class TableQueryQuotaManager implements ClusterChangeHandler {
 
   @Override
   public void init(HelixManager helixManager) {
-    Preconditions.checkState(_helixManager == null, "TableQueryQuotaManager is already initialized");
+    Preconditions.checkState(_helixManager == null, "HelixBasedQueryQuotaManager is already initialized");
     _helixManager = helixManager;
   }
 
@@ -190,10 +190,12 @@ public class TableQueryQuotaManager implements ClusterChangeHandler {
   }
 
   /**
-   * Acquire a token from rate limiter based on the table name.
-   * @param tableName original table name which could be raw.
+   * {@inheritDoc}
+   * <p>Acquires a token from rate limiter based on the table name.
+   *
    * @return true if there is no query quota specified for the table or a token can be acquired, otherwise return false.
    */
+  @Override
   public boolean acquire(String tableName) {
     LOGGER.debug("Trying to acquire token for table: {}", tableName);
     String offlineTableName = null;
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java
new file mode 100644
index 0000000..6fd335e
--- /dev/null
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.queryquota;
+
+public interface QueryQuotaManager {
+
+  /**
+   * Try to acquire a quota for the given table.
+   * @param tableName Table name with or without type suffix
+   * @return {@code true} if the table quota has not been reached, {@code false} otherwise
+   */
+  boolean acquire(String tableName);
+}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 20d384e..6981280 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -36,7 +36,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.broker.api.RequestStatistics;
 import org.apache.pinot.broker.api.RequesterIdentity;
 import org.apache.pinot.broker.broker.AccessControlFactory;
-import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
+import org.apache.pinot.broker.queryquota.QueryQuotaManager;
 import org.apache.pinot.broker.routing.RoutingTable;
 import org.apache.pinot.broker.routing.RoutingTableLookupRequest;
 import org.apache.pinot.broker.routing.TimeBoundaryService;
@@ -72,7 +72,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
   protected final RoutingTable _routingTable;
   protected final TimeBoundaryService _timeBoundaryService;
   protected final AccessControlFactory _accessControlFactory;
-  protected final TableQueryQuotaManager _tableQueryQuotaManager;
+  protected final QueryQuotaManager _queryQuotaManager;
   protected final BrokerMetrics _brokerMetrics;
 
   protected final AtomicLong _requestIdGenerator = new AtomicLong();
@@ -90,12 +90,12 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
 
   public BaseBrokerRequestHandler(Configuration config, RoutingTable routingTable,
       TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
-      TableQueryQuotaManager tableQueryQuotaManager, BrokerMetrics brokerMetrics) {
+      QueryQuotaManager queryQuotaManager, BrokerMetrics brokerMetrics) {
     _config = config;
     _routingTable = routingTable;
     _timeBoundaryService = timeBoundaryService;
     _accessControlFactory = accessControlFactory;
-    _tableQueryQuotaManager = tableQueryQuotaManager;
+    _queryQuotaManager = queryQuotaManager;
     _brokerMetrics = brokerMetrics;
 
     _brokerId = config.getString(CONFIG_OF_BROKER_ID, getDefaultBrokerId());
@@ -108,9 +108,9 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     _numDroppedLog = new AtomicInteger(0);
     _numDroppedLogRateLimiter = RateLimiter.create(1.0);
 
-    LOGGER.info(
-        "Broker Id: {}, timeout: {}ms, query response limit: {}, query log length: {}, query log max rate: {}qps",
-        _brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogLength, _queryLogRateLimiter.getRate());
+    LOGGER
+        .info("Broker Id: {}, timeout: {}ms, query response limit: {}, query log length: {}, query log max rate: {}qps",
+            _brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogLength, _queryLogRateLimiter.getRate());
   }
 
   private String getDefaultBrokerId() {
@@ -200,7 +200,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     }
 
     // Validate QPS quota
-    if (!_tableQueryQuotaManager.acquire(tableName)) {
+    if (!_queryQuotaManager.acquire(tableName)) {
       String errorMessage =
           String.format("Request %d exceeds query quota for table:%s, query:%s", requestId, tableName, query);
       LOGGER.info(errorMessage);
@@ -305,7 +305,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
 
     LOGGER.debug("Broker Response: {}", brokerResponse);
 
-    if(_queryLogRateLimiter.tryAcquire() || forceLog(brokerResponse, totalTimeMs)) {
+    if (_queryLogRateLimiter.tryAcquire() || forceLog(brokerResponse, totalTimeMs)) {
       // Table name might have been changed (with suffix _OFFLINE/_REALTIME appended)
       LOGGER.info(
           "RequestId:{}, table:{}, timeMs:{}, docs:{}/{}, entries:{}/{}, segments(queried/processed/matched):{}/{}/{} "
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
index 0202315..60a4312 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
@@ -36,8 +36,7 @@ import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.configuration.Configuration;
 import org.apache.pinot.broker.api.RequestStatistics;
 import org.apache.pinot.broker.broker.AccessControlFactory;
-import org.apache.pinot.broker.broker.helix.LiveInstanceChangeHandler;
-import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
+import org.apache.pinot.broker.queryquota.QueryQuotaManager;
 import org.apache.pinot.broker.routing.RoutingTable;
 import org.apache.pinot.broker.routing.TimeBoundaryService;
 import org.apache.pinot.common.config.TableNameBuilder;
@@ -79,7 +78,6 @@ public class ConnectionPoolBrokerRequestHandler extends BaseBrokerRequestHandler
   private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionPoolBrokerRequestHandler.class);
   private static final String TRANSPORT_CONFIG_PREFIX = "pinot.broker.transport";
 
-  private final LiveInstanceChangeHandler _liveInstanceChangeHandler;
   private final EventLoopGroup _eventLoopGroup;
   private final ScheduledThreadPoolExecutor _poolTimeoutExecutor;
   private final ExecutorService _requestSenderPool;
@@ -88,10 +86,8 @@ public class ConnectionPoolBrokerRequestHandler extends BaseBrokerRequestHandler
 
   public ConnectionPoolBrokerRequestHandler(Configuration config, RoutingTable routingTable,
       TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
-      TableQueryQuotaManager tableQueryQuotaManager, BrokerMetrics brokerMetrics,
-      LiveInstanceChangeHandler liveInstanceChangeHandler, MetricsRegistry metricsRegistry) {
-    super(config, routingTable, timeBoundaryService, accessControlFactory, tableQueryQuotaManager, brokerMetrics);
-    _liveInstanceChangeHandler = liveInstanceChangeHandler;
+      QueryQuotaManager queryQuotaManager, BrokerMetrics brokerMetrics, MetricsRegistry metricsRegistry) {
+    super(config, routingTable, timeBoundaryService, accessControlFactory, queryQuotaManager, brokerMetrics);
 
     TransportClientConf transportClientConf = new TransportClientConf();
     transportClientConf.init(_config.subset(TRANSPORT_CONFIG_PREFIX));
@@ -116,10 +112,13 @@ public class ConnectionPoolBrokerRequestHandler extends BaseBrokerRequestHandler
     _scatterGather = new ScatterGatherImpl(_connPool, _requestSenderPool);
   }
 
+  public KeyedPool<PooledNettyClientResourceManager.PooledClientConnection> getConnPool() {
+    return _connPool;
+  }
+
   @Override
   public synchronized void start() {
     _connPool.start();
-    _liveInstanceChangeHandler.init(_connPool);
   }
 
   @Override
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index 4231e52..d7a4890 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -26,7 +26,7 @@ import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.configuration.Configuration;
 import org.apache.pinot.broker.api.RequestStatistics;
 import org.apache.pinot.broker.broker.AccessControlFactory;
-import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
+import org.apache.pinot.broker.queryquota.QueryQuotaManager;
 import org.apache.pinot.broker.routing.RoutingTable;
 import org.apache.pinot.broker.routing.TimeBoundaryService;
 import org.apache.pinot.common.config.TableNameBuilder;
@@ -55,8 +55,8 @@ public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandl
 
   public SingleConnectionBrokerRequestHandler(Configuration config, RoutingTable routingTable,
       TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
-      TableQueryQuotaManager tableQueryQuotaManager, BrokerMetrics brokerMetrics) {
-    super(config, routingTable, timeBoundaryService, accessControlFactory, tableQueryQuotaManager, brokerMetrics);
+      QueryQuotaManager queryQuotaManager, BrokerMetrics brokerMetrics) {
+    super(config, routingTable, timeBoundaryService, accessControlFactory, queryQuotaManager, brokerMetrics);
     _queryRouter = new QueryRouter(_brokerId, brokerMetrics);
   }
 
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManagerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixBasedQueryQuotaManagerTest.java
similarity index 79%
rename from pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManagerTest.java
rename to pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixBasedQueryQuotaManagerTest.java
index 3e5eae2..852748f 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManagerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixBasedQueryQuotaManagerTest.java
@@ -44,10 +44,10 @@ import static org.apache.pinot.common.utils.CommonConstants.Helix.BROKER_RESOURC
 import static org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
 
 
-public class TableQueryQuotaManagerTest {
+public class HelixBasedQueryQuotaManagerTest {
   private ZkHelixPropertyStore<ZNRecord> _testPropertyStore;
   private HelixManager _helixManager;
-  private TableQueryQuotaManager _tableQueryQuotaManager;
+  private HelixBasedQueryQuotaManager _queryQuotaManager;
   private ZkStarter.ZookeeperInstance _zookeeperInstance;
   private static String RAW_TABLE_NAME = "testTable";
   private static String OFFLINE_TABLE_NAME = RAW_TABLE_NAME + "_OFFLINE";
@@ -62,8 +62,8 @@ public class TableQueryQuotaManagerTest {
     _helixManager = initHelixManager(helixClusterName);
     _testPropertyStore = _helixManager.getHelixPropertyStore();
 
-    _tableQueryQuotaManager = new TableQueryQuotaManager();
-    _tableQueryQuotaManager.init(_helixManager);
+    _queryQuotaManager = new HelixBasedQueryQuotaManager();
+    _queryQuotaManager.init(_helixManager);
   }
 
   private HelixManager initHelixManager(String helixClusterName) {
@@ -101,7 +101,7 @@ public class TableQueryQuotaManagerTest {
       ZKMetadataProvider.removeResourceConfigFromPropertyStore(_testPropertyStore, OFFLINE_TABLE_NAME);
       ZKMetadataProvider.removeResourceConfigFromPropertyStore(_testPropertyStore, REALTIME_TABLE_NAME);
     }
-    _tableQueryQuotaManager.cleanUpRateLimiterMap();
+    _queryQuotaManager.cleanUpRateLimiterMap();
   }
 
   @AfterTest
@@ -118,14 +118,14 @@ public class TableQueryQuotaManagerTest {
     ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
     setQps(tableConfig);
-    _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 1);
+    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
 
     // All the request should be passed.
     runQueries(70, 10);
 
-    _tableQueryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+    _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
   }
 
   @Test
@@ -133,8 +133,8 @@ public class TableQueryQuotaManagerTest {
       throws Exception {
     ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
-    _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
   }
 
   @Test
@@ -151,12 +151,12 @@ public class TableQueryQuotaManagerTest {
 
     ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
-    _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
 
     // Nothing happened since it doesn't have qps quota.
-    _tableQueryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+    _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
   }
 
   @Test
@@ -174,12 +174,12 @@ public class TableQueryQuotaManagerTest {
 
     ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
-    _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
 
     // Drop the offline table won't have any affect since it is table type specific.
-    _tableQueryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+    _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
   }
 
   @Test
@@ -210,22 +210,22 @@ public class TableQueryQuotaManagerTest {
     ZKMetadataProvider.setOfflineTableConfig(_testPropertyStore, OFFLINE_TABLE_NAME, offlineTableConfig.toZNRecord());
 
     // Since each table has 2 online brokers, per broker rate becomes 100.0 / 2 = 50.0
-    _tableQueryQuotaManager.initTableQueryQuota(offlineTableConfig, brokerResource);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 1);
-    _tableQueryQuotaManager.initTableQueryQuota(realtimeTableConfig, brokerResource);
+    _queryQuotaManager.initTableQueryQuota(offlineTableConfig, brokerResource);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+    _queryQuotaManager.initTableQueryQuota(realtimeTableConfig, brokerResource);
     // The hash map now contains 2 entries for both of the tables.
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 2);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 2);
 
     // Rate limiter generates 1 token every 10 milliseconds, have to make it sleep for a while.
     runQueries(70, 10L);
 
-    _tableQueryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
+    _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
     // Since real-time table still has the qps quota, the size of the hash map becomes 1.
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 1);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
 
-    _tableQueryQuotaManager.dropTableQueryQuota(REALTIME_TABLE_NAME);
+    _queryQuotaManager.dropTableQueryQuota(REALTIME_TABLE_NAME);
     // Since the only 1 table which has qps quota has been dropped, the size of the hash map becomes 0.
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
   }
 
   @Test
@@ -234,13 +234,13 @@ public class TableQueryQuotaManagerTest {
     ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME);
     setQps(tableConfig);
-    _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 1);
+    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
 
     runQueries(70, 10L);
 
-    _tableQueryQuotaManager.dropTableQueryQuota(REALTIME_TABLE_NAME);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+    _queryQuotaManager.dropTableQueryQuota(REALTIME_TABLE_NAME);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
   }
 
   @Test
@@ -248,8 +248,8 @@ public class TableQueryQuotaManagerTest {
       throws Exception {
     ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME);
-    _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
   }
 
   @Test
@@ -265,8 +265,8 @@ public class TableQueryQuotaManagerTest {
 
     ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME);
-    _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
   }
 
   @Test
@@ -283,8 +283,8 @@ public class TableQueryQuotaManagerTest {
 
     ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
     TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME);
-    _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
   }
 
   @Test
@@ -296,8 +296,8 @@ public class TableQueryQuotaManagerTest {
     QuotaConfig quotaConfig = new QuotaConfig();
     quotaConfig.setMaxQueriesPerSecond("InvalidQpsQuota");
     tableConfig.setQuotaConfig(quotaConfig);
-    _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
   }
 
   @Test
@@ -309,8 +309,8 @@ public class TableQueryQuotaManagerTest {
     QuotaConfig quotaConfig = new QuotaConfig();
     quotaConfig.setMaxQueriesPerSecond("-1.0");
     tableConfig.setQuotaConfig(quotaConfig);
-    _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
   }
 
   @Test
@@ -318,8 +318,8 @@ public class TableQueryQuotaManagerTest {
       throws Exception {
     TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
     setQps(tableConfig);
-    _tableQueryQuotaManager.initTableQueryQuota(tableConfig, null);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+    _queryQuotaManager.initTableQueryQuota(tableConfig, null);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
   }
 
   @Test
@@ -328,8 +328,8 @@ public class TableQueryQuotaManagerTest {
     ExternalView brokerResource = new ExternalView(BROKER_RESOURCE_INSTANCE);
     TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
     setQps(tableConfig);
-    _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 1);
+    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
   }
 
   @Test
@@ -339,11 +339,11 @@ public class TableQueryQuotaManagerTest {
     brokerResource.setState(OFFLINE_TABLE_NAME, "broker_instance_2", "OFFLINE");
     TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
     setQps(tableConfig);
-    _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+    _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
 
     // For the 1st version we don't check the number of online brokers.
     // Thus the expected size now is 1. It'll be 0 when we bring dynamic rate back.
-    Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 1);
+    Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
   }
 
   private TableConfig generateDefaultTableConfig(String tableName) {
@@ -370,7 +370,7 @@ public class TableQueryQuotaManagerTest {
       throws InterruptedException {
     int count = 0;
     for (int i = 0; i < numOfTimesToRun; i++) {
-      Assert.assertTrue(_tableQueryQuotaManager.acquire(RAW_TABLE_NAME));
+      Assert.assertTrue(_queryQuotaManager.acquire(RAW_TABLE_NAME));
       count++;
       Thread.sleep(millis);
     }
@@ -380,7 +380,7 @@ public class TableQueryQuotaManagerTest {
     count = 0;
     millis /= 2;
     for (int i = 0; i < numOfTimesToRun; i++) {
-      if (!_tableQueryQuotaManager.acquire(RAW_TABLE_NAME)) {
+      if (!_queryQuotaManager.acquire(RAW_TABLE_NAME)) {
         count++;
       }
       Thread.sleep(millis);
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 8ed7999..fee6962 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -111,6 +111,18 @@ public class CommonConstants {
   }
 
   public static class Broker {
+    public static final String ROUTING_TABLE_CONFIG_PREFIX = "pinot.broker.routing.table";
+    public static final String ACCESS_CONTROL_CONFIG_PREFIX = "pinot.broker.access.control";
+    public static final String METRICS_CONFIG_PREFIX = "pinot.broker.metrics";
+
+    public static final String CONFIG_OF_DELAY_SHUTDOWN_TIME_MS = "pinot.broker.delayShutdownTimeMs";
+    public static final long DEFAULT_DELAY_SHUTDOWN_TIME_MS = 10_000L;
+    public static final String CONFIG_OF_ENABLE_TABLE_LEVEL_METRICS = "pinot.broker.enableTableLevelMetrics";
+    public static final String CONFIG_OF_REQUEST_HANDLER_TYPE = "pinot.broker.requestHandlerType";
+    public static final String SINGLE_CONNECTION_REQUEST_HANDLER_TYPE = "singleConnection";
+    public static final String CONNECTION_POOL_REQUEST_HANDLER_TYPE = "connectionPool";
+    public static final String DEFAULT_REQUEST_HANDLER_TYPE = SINGLE_CONNECTION_REQUEST_HANDLER_TYPE;
+
     public static final String CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT = "pinot.broker.query.response.limit";
     public static final int DEFAULT_BROKER_QUERY_RESPONSE_LIMIT = Integer.MAX_VALUE;
     public static final String CONFIG_OF_BROKER_QUERY_LOG_LENGTH = "pinot.broker.query.log.length";
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index d671311..f0580a0 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -44,7 +44,6 @@ import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.io.FileUtils;
 import org.apache.http.HttpStatus;
-import org.apache.pinot.broker.broker.BrokerServerBuilder;
 import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
 import org.apache.pinot.common.config.IndexingConfig;
 import org.apache.pinot.common.config.TableConfig;
@@ -120,11 +119,12 @@ public abstract class ClusterTest extends ControllerTest {
       Configuration brokerConf = new BaseConfiguration();
       brokerConf.setProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 60 * 1000L);
       brokerConf.setProperty(Helix.KEY_OF_BROKER_QUERY_PORT, Integer.toString(basePort + i));
-      brokerConf.setProperty(BrokerServerBuilder.DELAY_SHUTDOWN_TIME_MS_CONFIG, 0);
+      brokerConf.setProperty(Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, 0);
       // Randomly choose to use connection-pool or single-connection request handler
       if (RANDOM.nextBoolean()) {
-        brokerConf.setProperty(BrokerServerBuilder.REQUEST_HANDLER_TYPE_CONFIG,
-            BrokerServerBuilder.SINGLE_CONNECTION_REQUEST_HANDLER_TYPE);
+        brokerConf.setProperty(Broker.CONFIG_OF_REQUEST_HANDLER_TYPE, Broker.CONNECTION_POOL_REQUEST_HANDLER_TYPE);
+      } else {
+        brokerConf.setProperty(Broker.CONFIG_OF_REQUEST_HANDLER_TYPE, Broker.SINGLE_CONNECTION_REQUEST_HANDLER_TYPE);
       }
       overrideBrokerConf(brokerConf);
       HelixBrokerStarter brokerStarter = new HelixBrokerStarter(brokerConf, _clusterName, zkStr);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org