You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/04/19 04:57:08 UTC
[incubator-pinot] 01/01: Decouple BrokerServerBuilder from Helix
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch decouple_broker
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 3594a32faad7401ba87fe4bbbe5f3b09e54d4b4f
Author: Jackie (Xiaotian) Jiang <xa...@linkedin.com>
AuthorDate: Thu Apr 18 21:54:09 2019 -0700
Decouple BrokerServerBuilder from Helix
BrokerServerBuilder should be independent of Helix, and only
performs broker functionalities.
- Add QueryQuotaManager interface for quota management
- Remove LiveInstanceChangeListener from BrokerServerBuilder
constructor
- Move constants into CommonConstants
---
.../pinot/broker/broker/BrokerServerBuilder.java | 39 ++++-----
...okerResourceOnlineOfflineStateModelFactory.java | 16 ++--
.../broker/broker/helix/HelixBrokerStarter.java | 36 ++++----
...nager.java => HelixBasedQueryQuotaManager.java} | 12 +--
.../pinot/broker/queryquota/QueryQuotaManager.java | 29 +++++++
.../requesthandler/BaseBrokerRequestHandler.java | 18 ++--
.../ConnectionPoolBrokerRequestHandler.java | 15 ++--
.../SingleConnectionBrokerRequestHandler.java | 6 +-
...t.java => HelixBasedQueryQuotaManagerTest.java} | 98 +++++++++++-----------
.../apache/pinot/common/utils/CommonConstants.java | 12 +++
.../pinot/integration/tests/ClusterTest.java | 8 +-
11 files changed, 161 insertions(+), 128 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
index a63e954..1880cc5 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java
@@ -22,8 +22,7 @@ import com.google.common.base.Preconditions;
import com.yammer.metrics.core.MetricsRegistry;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.configuration.Configuration;
-import org.apache.pinot.broker.broker.helix.LiveInstanceChangeHandler;
-import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
+import org.apache.pinot.broker.queryquota.QueryQuotaManager;
import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.ConnectionPoolBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.SingleConnectionBrokerRequestHandler;
@@ -36,33 +35,24 @@ import org.apache.pinot.common.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.pinot.common.utils.CommonConstants.Broker.*;
+
public class BrokerServerBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(BrokerServerBuilder.class);
- public static final String DELAY_SHUTDOWN_TIME_MS_CONFIG = "pinot.broker.delayShutdownTimeMs";
- public static final long DEFAULT_DELAY_SHUTDOWN_TIME_MS = 10_000;
- public static final String ACCESS_CONTROL_PREFIX = "pinot.broker.access.control";
- public static final String METRICS_CONFIG_PREFIX = "pinot.broker.metrics";
- public static final String TABLE_LEVEL_METRICS_CONFIG = "pinot.broker.enableTableLevelMetrics";
- public static final String REQUEST_HANDLER_TYPE_CONFIG = "pinot.broker.requestHandlerType";
- public static final String SINGLE_CONNECTION_REQUEST_HANDLER_TYPE = "singleConnection";
- public static final String CONNECTION_POOL_REQUEST_HANDLER_TYPE = "connectionPool";
- public static final String DEFAULT_REQUEST_HANDLER_TYPE = SINGLE_CONNECTION_REQUEST_HANDLER_TYPE;
-
public enum State {
INIT, STARTING, RUNNING, SHUTTING_DOWN, SHUTDOWN
}
// Running State Of broker
- private final AtomicReference<State> _state = new AtomicReference<>();
+ private final AtomicReference<State> _state = new AtomicReference<>(State.INIT);
private final Configuration _config;
private final long _delayedShutdownTimeMs;
private final RoutingTable _routingTable;
private final TimeBoundaryService _timeBoundaryService;
- private final LiveInstanceChangeHandler _liveInstanceChangeHandler;
- private final TableQueryQuotaManager _tableQueryQuotaManager;
+ private final QueryQuotaManager _queryQuotaManager;
private final AccessControlFactory _accessControlFactory;
private final MetricsRegistry _metricsRegistry;
private final BrokerMetrics _brokerMetrics;
@@ -70,34 +60,33 @@ public class BrokerServerBuilder {
private final BrokerAdminApiApplication _brokerAdminApplication;
public BrokerServerBuilder(Configuration config, RoutingTable routingTable, TimeBoundaryService timeBoundaryService,
- LiveInstanceChangeHandler liveInstanceChangeHandler, TableQueryQuotaManager tableQueryQuotaManager) {
- _state.set(State.INIT);
+ QueryQuotaManager queryQuotaManager) {
_config = config;
- _delayedShutdownTimeMs = config.getLong(DELAY_SHUTDOWN_TIME_MS_CONFIG, DEFAULT_DELAY_SHUTDOWN_TIME_MS);
+ _delayedShutdownTimeMs = config.getLong(CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, DEFAULT_DELAY_SHUTDOWN_TIME_MS);
_routingTable = routingTable;
_timeBoundaryService = timeBoundaryService;
- _liveInstanceChangeHandler = liveInstanceChangeHandler;
- _tableQueryQuotaManager = tableQueryQuotaManager;
- _accessControlFactory = AccessControlFactory.loadFactory(_config.subset(ACCESS_CONTROL_PREFIX));
+ _queryQuotaManager = queryQuotaManager;
+ _accessControlFactory = AccessControlFactory.loadFactory(_config.subset(ACCESS_CONTROL_CONFIG_PREFIX));
_metricsRegistry = new MetricsRegistry();
MetricsHelper.initializeMetrics(config.subset(METRICS_CONFIG_PREFIX));
MetricsHelper.registerMetricsRegistry(_metricsRegistry);
- _brokerMetrics = new BrokerMetrics(_metricsRegistry, !_config.getBoolean(TABLE_LEVEL_METRICS_CONFIG, true));
+ _brokerMetrics =
+ new BrokerMetrics(_metricsRegistry, !_config.getBoolean(CONFIG_OF_ENABLE_TABLE_LEVEL_METRICS, true));
_brokerMetrics.initializeGlobalMeters();
_brokerRequestHandler = buildRequestHandler();
_brokerAdminApplication = new BrokerAdminApiApplication(this);
}
private BrokerRequestHandler buildRequestHandler() {
- String requestHandlerType = _config.getString(REQUEST_HANDLER_TYPE_CONFIG, DEFAULT_REQUEST_HANDLER_TYPE);
+ String requestHandlerType = _config.getString(CONFIG_OF_REQUEST_HANDLER_TYPE, DEFAULT_REQUEST_HANDLER_TYPE);
if (requestHandlerType.equalsIgnoreCase(CONNECTION_POOL_REQUEST_HANDLER_TYPE)) {
LOGGER.info("Using ConnectionPoolBrokerRequestHandler");
return new ConnectionPoolBrokerRequestHandler(_config, _routingTable, _timeBoundaryService, _accessControlFactory,
- _tableQueryQuotaManager, _brokerMetrics, _liveInstanceChangeHandler, _metricsRegistry);
+ _queryQuotaManager, _brokerMetrics, _metricsRegistry);
} else {
LOGGER.info("Using SingleConnectionBrokerRequestHandler");
return new SingleConnectionBrokerRequestHandler(_config, _routingTable, _timeBoundaryService,
- _accessControlFactory, _tableQueryQuotaManager, _brokerMetrics);
+ _accessControlFactory, _queryQuotaManager, _brokerMetrics);
}
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
index 1004f5a..81f7cfd 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BrokerResourceOnlineOfflineStateModelFactory.java
@@ -27,7 +27,7 @@ import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
+import org.apache.pinot.broker.queryquota.HelixBasedQueryQuotaManager;
import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.config.TableConfig;
@@ -50,15 +50,15 @@ public class BrokerResourceOnlineOfflineStateModelFactory extends StateModelFact
private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
private final HelixDataAccessor _helixDataAccessor;
private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
- private final TableQueryQuotaManager _tableQueryQuotaManager;
+ private final HelixBasedQueryQuotaManager _helixBasedQueryQuotaManager;
public BrokerResourceOnlineOfflineStateModelFactory(ZkHelixPropertyStore<ZNRecord> propertyStore,
HelixDataAccessor helixDataAccessor, HelixExternalViewBasedRouting helixExternalViewBasedRouting,
- TableQueryQuotaManager tableQueryQuotaManager) {
+ HelixBasedQueryQuotaManager helixBasedQueryQuotaManager) {
_helixDataAccessor = helixDataAccessor;
_propertyStore = propertyStore;
_helixExternalViewBasedRouting = helixExternalViewBasedRouting;
- _tableQueryQuotaManager = tableQueryQuotaManager;
+ _helixBasedQueryQuotaManager = helixBasedQueryQuotaManager;
}
public static String getStateModelDef() {
@@ -83,7 +83,7 @@ public class BrokerResourceOnlineOfflineStateModelFactory extends StateModelFact
_helixExternalViewBasedRouting.markDataResourceOnline(tableConfig,
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(tableName)),
_helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs()));
- _tableQueryQuotaManager.initTableQueryQuota(tableConfig,
+ _helixBasedQueryQuotaManager.initTableQueryQuota(tableConfig,
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(BROKER_RESOURCE_INSTANCE)));
} catch (Exception e) {
LOGGER.error("Caught exception during OFFLINE -> ONLINE transition", e);
@@ -98,7 +98,7 @@ public class BrokerResourceOnlineOfflineStateModelFactory extends StateModelFact
LOGGER.info("BrokerResourceOnlineOfflineStateModel.onBecomeOfflineFromOnline() : " + message);
String tableName = message.getPartitionName();
_helixExternalViewBasedRouting.markDataResourceOffline(tableName);
- _tableQueryQuotaManager.dropTableQueryQuota(tableName);
+ _helixBasedQueryQuotaManager.dropTableQueryQuota(tableName);
} catch (Exception e) {
LOGGER.error("Caught exception during ONLINE -> OFFLINE transition", e);
Utils.rethrowException(e);
@@ -112,7 +112,7 @@ public class BrokerResourceOnlineOfflineStateModelFactory extends StateModelFact
LOGGER.info("BrokerResourceOnlineOfflineStateModel.onBecomeDroppedFromOffline() : " + message);
String tableName = message.getPartitionName();
_helixExternalViewBasedRouting.markDataResourceOffline(tableName);
- _tableQueryQuotaManager.dropTableQueryQuota(tableName);
+ _helixBasedQueryQuotaManager.dropTableQueryQuota(tableName);
} catch (Exception e) {
LOGGER.error("Caught exception during OFFLINE -> DROPPED transition", e);
Utils.rethrowException(e);
@@ -126,7 +126,7 @@ public class BrokerResourceOnlineOfflineStateModelFactory extends StateModelFact
LOGGER.info("BrokerResourceOnlineOfflineStateModel.onBecomeDroppedFromOnline() : " + message);
String tableName = message.getPartitionName();
_helixExternalViewBasedRouting.markDataResourceOffline(tableName);
- _tableQueryQuotaManager.dropTableQueryQuota(tableName);
+ _helixBasedQueryQuotaManager.dropTableQueryQuota(tableName);
} catch (Exception e) {
LOGGER.error("Caught exception during ONLINE -> DROPPED transition", e);
Utils.rethrowException(e);
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
index d684d09..80b83a6 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
@@ -42,8 +42,11 @@ import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.broker.broker.AccessControlFactory;
import org.apache.pinot.broker.broker.BrokerServerBuilder;
-import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
+import org.apache.pinot.broker.queryquota.HelixBasedQueryQuotaManager;
+import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
+import org.apache.pinot.broker.requesthandler.ConnectionPoolBrokerRequestHandler;
import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
+import org.apache.pinot.common.Utils;
import org.apache.pinot.common.config.TagNameUtils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.BrokerMeter;
@@ -58,7 +61,6 @@ import org.slf4j.LoggerFactory;
@SuppressWarnings("unused")
public class HelixBrokerStarter {
private static final Logger LOGGER = LoggerFactory.getLogger(HelixBrokerStarter.class);
- private static final String ROUTING_TABLE_PARAMS_SUBSET_KEY = "pinot.broker.routing.table";
private final Configuration _brokerConf;
private final String _clusterName;
@@ -77,13 +79,11 @@ public class HelixBrokerStarter {
// Cluster change handlers
private HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
- private TableQueryQuotaManager _tableQueryQuotaManager;
+ private HelixBasedQueryQuotaManager _helixBasedQueryQuotaManager;
private LiveInstanceChangeHandler _liveInstanceChangeHandler;
private ClusterChangeMediator _clusterChangeMediator;
private BrokerServerBuilder _brokerServerBuilder;
- private AccessControlFactory _accessControlFactory;
- private MetricsRegistry _metricsRegistry;
// Participant Helix manager handles Helix functionality such as state transitions and messages
private HelixManager _participantHelixManager;
@@ -149,10 +149,10 @@ public class HelixBrokerStarter {
_liveInstanceChangeHandlers.add(liveInstanceChangeHandler);
}
- // TODO: refactor this logic into BrokerServerBuilder
public void start()
throws Exception {
LOGGER.info("Starting Pinot broker");
+ Utils.logVersions();
// Connect the spectator Helix manager
LOGGER.info("Connecting spectator Helix manager");
@@ -163,22 +163,24 @@ public class HelixBrokerStarter {
_propertyStore = _spectatorHelixManager.getHelixPropertyStore();
_helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor();
_helixExternalViewBasedRouting =
- new HelixExternalViewBasedRouting(_brokerConf.subset(ROUTING_TABLE_PARAMS_SUBSET_KEY));
+ new HelixExternalViewBasedRouting(_brokerConf.subset(CommonConstants.Broker.ROUTING_TABLE_CONFIG_PREFIX));
_helixExternalViewBasedRouting.init(_spectatorHelixManager);
- _tableQueryQuotaManager = new TableQueryQuotaManager();
- _tableQueryQuotaManager.init(_spectatorHelixManager);
+ _helixBasedQueryQuotaManager = new HelixBasedQueryQuotaManager();
+ _helixBasedQueryQuotaManager.init(_spectatorHelixManager);
_liveInstanceChangeHandler = new LiveInstanceChangeHandler();
_liveInstanceChangeHandler.init(_spectatorHelixManager);
// Set up the broker server builder
LOGGER.info("Setting up broker server builder");
_brokerServerBuilder = new BrokerServerBuilder(_brokerConf, _helixExternalViewBasedRouting,
- _helixExternalViewBasedRouting.getTimeBoundaryService(), _liveInstanceChangeHandler, _tableQueryQuotaManager);
- _accessControlFactory = _brokerServerBuilder.getAccessControlFactory();
- _metricsRegistry = _brokerServerBuilder.getMetricsRegistry();
+ _helixExternalViewBasedRouting.getTimeBoundaryService(), _helixBasedQueryQuotaManager);
+ BrokerRequestHandler brokerRequestHandler = _brokerServerBuilder.getBrokerRequestHandler();
+ if (brokerRequestHandler instanceof ConnectionPoolBrokerRequestHandler) {
+ _liveInstanceChangeHandler.init(((ConnectionPoolBrokerRequestHandler) brokerRequestHandler).getConnPool());
+ }
BrokerMetrics brokerMetrics = _brokerServerBuilder.getBrokerMetrics();
_helixExternalViewBasedRouting.setBrokerMetrics(brokerMetrics);
- _tableQueryQuotaManager.setBrokerMetrics(brokerMetrics);
+ _helixBasedQueryQuotaManager.setBrokerMetrics(brokerMetrics);
_brokerServerBuilder.start();
// Initialize the cluster change mediator
@@ -187,7 +189,7 @@ public class HelixBrokerStarter {
externalViewChangeHandler.init(_spectatorHelixManager);
}
_externalViewChangeHandlers.add(_helixExternalViewBasedRouting);
- _externalViewChangeHandlers.add(_tableQueryQuotaManager);
+ _externalViewChangeHandlers.add(_helixBasedQueryQuotaManager);
for (ClusterChangeHandler instanceConfigChangeHandler : _instanceConfigChangeHandlers) {
instanceConfigChangeHandler.init(_spectatorHelixManager);
}
@@ -213,7 +215,7 @@ public class HelixBrokerStarter {
StateMachineEngine stateMachineEngine = _participantHelixManager.getStateMachineEngine();
StateModelFactory<?> stateModelFactory =
new BrokerResourceOnlineOfflineStateModelFactory(_propertyStore, _helixDataAccessor,
- _helixExternalViewBasedRouting, _tableQueryQuotaManager);
+ _helixExternalViewBasedRouting, _helixBasedQueryQuotaManager);
stateMachineEngine
.registerStateModelFactory(BrokerResourceOnlineOfflineStateModelFactory.getStateModelDef(), stateModelFactory);
_participantHelixManager.connect();
@@ -288,7 +290,7 @@ public class HelixBrokerStarter {
}
public AccessControlFactory getAccessControlFactory() {
- return _accessControlFactory;
+ return _brokerServerBuilder.getAccessControlFactory();
}
public HelixManager getSpectatorHelixManager() {
@@ -304,7 +306,7 @@ public class HelixBrokerStarter {
}
public MetricsRegistry getMetricsRegistry() {
- return _metricsRegistry;
+ return _brokerServerBuilder.getMetricsRegistry();
}
public static HelixBrokerStarter getDefault()
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixBasedQueryQuotaManager.java
similarity index 97%
rename from pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManager.java
rename to pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixBasedQueryQuotaManager.java
index 1a685d2..af0f8ae 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManager.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixBasedQueryQuotaManager.java
@@ -46,8 +46,8 @@ import static org.apache.pinot.common.utils.CommonConstants.Helix.BROKER_RESOURC
import static org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
-public class TableQueryQuotaManager implements ClusterChangeHandler {
- private static final Logger LOGGER = LoggerFactory.getLogger(TableQueryQuotaManager.class);
+public class HelixBasedQueryQuotaManager implements ClusterChangeHandler, QueryQuotaManager {
+ private static final Logger LOGGER = LoggerFactory.getLogger(HelixBasedQueryQuotaManager.class);
private static final int TIME_RANGE_IN_SECOND = 1;
private final AtomicInteger _lastKnownBrokerResourceVersion = new AtomicInteger(-1);
@@ -58,7 +58,7 @@ public class TableQueryQuotaManager implements ClusterChangeHandler {
@Override
public void init(HelixManager helixManager) {
- Preconditions.checkState(_helixManager == null, "TableQueryQuotaManager is already initialized");
+ Preconditions.checkState(_helixManager == null, "HelixBasedQueryQuotaManager is already initialized");
_helixManager = helixManager;
}
@@ -190,10 +190,12 @@ public class TableQueryQuotaManager implements ClusterChangeHandler {
}
/**
- * Acquire a token from rate limiter based on the table name.
- * @param tableName original table name which could be raw.
+ * {@inheritDoc}
+ * <p>Acquires a token from rate limiter based on the table name.
+ *
* @return true if there is no query quota specified for the table or a token can be acquired, otherwise return false.
*/
+ @Override
public boolean acquire(String tableName) {
LOGGER.debug("Trying to acquire token for table: {}", tableName);
String offlineTableName = null;
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java
new file mode 100644
index 0000000..6fd335e
--- /dev/null
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/QueryQuotaManager.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.queryquota;
+
+public interface QueryQuotaManager {
+
+ /**
+ * Try to acquire a quota for the given table.
+ * @param tableName Table name with or without type suffix
+ * @return {@code true} if the table quota has not been reached, {@code false} otherwise
+ */
+ boolean acquire(String tableName);
+}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 20d384e..6981280 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -36,7 +36,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.broker.api.RequestStatistics;
import org.apache.pinot.broker.api.RequesterIdentity;
import org.apache.pinot.broker.broker.AccessControlFactory;
-import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
+import org.apache.pinot.broker.queryquota.QueryQuotaManager;
import org.apache.pinot.broker.routing.RoutingTable;
import org.apache.pinot.broker.routing.RoutingTableLookupRequest;
import org.apache.pinot.broker.routing.TimeBoundaryService;
@@ -72,7 +72,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
protected final RoutingTable _routingTable;
protected final TimeBoundaryService _timeBoundaryService;
protected final AccessControlFactory _accessControlFactory;
- protected final TableQueryQuotaManager _tableQueryQuotaManager;
+ protected final QueryQuotaManager _queryQuotaManager;
protected final BrokerMetrics _brokerMetrics;
protected final AtomicLong _requestIdGenerator = new AtomicLong();
@@ -90,12 +90,12 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
public BaseBrokerRequestHandler(Configuration config, RoutingTable routingTable,
TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
- TableQueryQuotaManager tableQueryQuotaManager, BrokerMetrics brokerMetrics) {
+ QueryQuotaManager queryQuotaManager, BrokerMetrics brokerMetrics) {
_config = config;
_routingTable = routingTable;
_timeBoundaryService = timeBoundaryService;
_accessControlFactory = accessControlFactory;
- _tableQueryQuotaManager = tableQueryQuotaManager;
+ _queryQuotaManager = queryQuotaManager;
_brokerMetrics = brokerMetrics;
_brokerId = config.getString(CONFIG_OF_BROKER_ID, getDefaultBrokerId());
@@ -108,9 +108,9 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
_numDroppedLog = new AtomicInteger(0);
_numDroppedLogRateLimiter = RateLimiter.create(1.0);
- LOGGER.info(
- "Broker Id: {}, timeout: {}ms, query response limit: {}, query log length: {}, query log max rate: {}qps",
- _brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogLength, _queryLogRateLimiter.getRate());
+ LOGGER
+ .info("Broker Id: {}, timeout: {}ms, query response limit: {}, query log length: {}, query log max rate: {}qps",
+ _brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogLength, _queryLogRateLimiter.getRate());
}
private String getDefaultBrokerId() {
@@ -200,7 +200,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
}
// Validate QPS quota
- if (!_tableQueryQuotaManager.acquire(tableName)) {
+ if (!_queryQuotaManager.acquire(tableName)) {
String errorMessage =
String.format("Request %d exceeds query quota for table:%s, query:%s", requestId, tableName, query);
LOGGER.info(errorMessage);
@@ -305,7 +305,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
LOGGER.debug("Broker Response: {}", brokerResponse);
- if(_queryLogRateLimiter.tryAcquire() || forceLog(brokerResponse, totalTimeMs)) {
+ if (_queryLogRateLimiter.tryAcquire() || forceLog(brokerResponse, totalTimeMs)) {
// Table name might have been changed (with suffix _OFFLINE/_REALTIME appended)
LOGGER.info(
"RequestId:{}, table:{}, timeMs:{}, docs:{}/{}, entries:{}/{}, segments(queried/processed/matched):{}/{}/{} "
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
index 0202315..60a4312 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/ConnectionPoolBrokerRequestHandler.java
@@ -36,8 +36,7 @@ import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.configuration.Configuration;
import org.apache.pinot.broker.api.RequestStatistics;
import org.apache.pinot.broker.broker.AccessControlFactory;
-import org.apache.pinot.broker.broker.helix.LiveInstanceChangeHandler;
-import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
+import org.apache.pinot.broker.queryquota.QueryQuotaManager;
import org.apache.pinot.broker.routing.RoutingTable;
import org.apache.pinot.broker.routing.TimeBoundaryService;
import org.apache.pinot.common.config.TableNameBuilder;
@@ -79,7 +78,6 @@ public class ConnectionPoolBrokerRequestHandler extends BaseBrokerRequestHandler
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionPoolBrokerRequestHandler.class);
private static final String TRANSPORT_CONFIG_PREFIX = "pinot.broker.transport";
- private final LiveInstanceChangeHandler _liveInstanceChangeHandler;
private final EventLoopGroup _eventLoopGroup;
private final ScheduledThreadPoolExecutor _poolTimeoutExecutor;
private final ExecutorService _requestSenderPool;
@@ -88,10 +86,8 @@ public class ConnectionPoolBrokerRequestHandler extends BaseBrokerRequestHandler
public ConnectionPoolBrokerRequestHandler(Configuration config, RoutingTable routingTable,
TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
- TableQueryQuotaManager tableQueryQuotaManager, BrokerMetrics brokerMetrics,
- LiveInstanceChangeHandler liveInstanceChangeHandler, MetricsRegistry metricsRegistry) {
- super(config, routingTable, timeBoundaryService, accessControlFactory, tableQueryQuotaManager, brokerMetrics);
- _liveInstanceChangeHandler = liveInstanceChangeHandler;
+ QueryQuotaManager queryQuotaManager, BrokerMetrics brokerMetrics, MetricsRegistry metricsRegistry) {
+ super(config, routingTable, timeBoundaryService, accessControlFactory, queryQuotaManager, brokerMetrics);
TransportClientConf transportClientConf = new TransportClientConf();
transportClientConf.init(_config.subset(TRANSPORT_CONFIG_PREFIX));
@@ -116,10 +112,13 @@ public class ConnectionPoolBrokerRequestHandler extends BaseBrokerRequestHandler
_scatterGather = new ScatterGatherImpl(_connPool, _requestSenderPool);
}
+ public KeyedPool<PooledNettyClientResourceManager.PooledClientConnection> getConnPool() {
+ return _connPool;
+ }
+
@Override
public synchronized void start() {
_connPool.start();
- _liveInstanceChangeHandler.init(_connPool);
}
@Override
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index 4231e52..d7a4890 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -26,7 +26,7 @@ import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.configuration.Configuration;
import org.apache.pinot.broker.api.RequestStatistics;
import org.apache.pinot.broker.broker.AccessControlFactory;
-import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
+import org.apache.pinot.broker.queryquota.QueryQuotaManager;
import org.apache.pinot.broker.routing.RoutingTable;
import org.apache.pinot.broker.routing.TimeBoundaryService;
import org.apache.pinot.common.config.TableNameBuilder;
@@ -55,8 +55,8 @@ public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandl
public SingleConnectionBrokerRequestHandler(Configuration config, RoutingTable routingTable,
TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
- TableQueryQuotaManager tableQueryQuotaManager, BrokerMetrics brokerMetrics) {
- super(config, routingTable, timeBoundaryService, accessControlFactory, tableQueryQuotaManager, brokerMetrics);
+ QueryQuotaManager queryQuotaManager, BrokerMetrics brokerMetrics) {
+ super(config, routingTable, timeBoundaryService, accessControlFactory, queryQuotaManager, brokerMetrics);
_queryRouter = new QueryRouter(_brokerId, brokerMetrics);
}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManagerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixBasedQueryQuotaManagerTest.java
similarity index 79%
rename from pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManagerTest.java
rename to pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixBasedQueryQuotaManagerTest.java
index 3e5eae2..852748f 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/TableQueryQuotaManagerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/HelixBasedQueryQuotaManagerTest.java
@@ -44,10 +44,10 @@ import static org.apache.pinot.common.utils.CommonConstants.Helix.BROKER_RESOURC
import static org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
-public class TableQueryQuotaManagerTest {
+public class HelixBasedQueryQuotaManagerTest {
private ZkHelixPropertyStore<ZNRecord> _testPropertyStore;
private HelixManager _helixManager;
- private TableQueryQuotaManager _tableQueryQuotaManager;
+ private HelixBasedQueryQuotaManager _queryQuotaManager;
private ZkStarter.ZookeeperInstance _zookeeperInstance;
private static String RAW_TABLE_NAME = "testTable";
private static String OFFLINE_TABLE_NAME = RAW_TABLE_NAME + "_OFFLINE";
@@ -62,8 +62,8 @@ public class TableQueryQuotaManagerTest {
_helixManager = initHelixManager(helixClusterName);
_testPropertyStore = _helixManager.getHelixPropertyStore();
- _tableQueryQuotaManager = new TableQueryQuotaManager();
- _tableQueryQuotaManager.init(_helixManager);
+ _queryQuotaManager = new HelixBasedQueryQuotaManager();
+ _queryQuotaManager.init(_helixManager);
}
private HelixManager initHelixManager(String helixClusterName) {
@@ -101,7 +101,7 @@ public class TableQueryQuotaManagerTest {
ZKMetadataProvider.removeResourceConfigFromPropertyStore(_testPropertyStore, OFFLINE_TABLE_NAME);
ZKMetadataProvider.removeResourceConfigFromPropertyStore(_testPropertyStore, REALTIME_TABLE_NAME);
}
- _tableQueryQuotaManager.cleanUpRateLimiterMap();
+ _queryQuotaManager.cleanUpRateLimiterMap();
}
@AfterTest
@@ -118,14 +118,14 @@ public class TableQueryQuotaManagerTest {
ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
setQps(tableConfig);
- _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
- Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 1);
+ _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
// All the request should be passed.
runQueries(70, 10);
- _tableQueryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
- Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+ _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
}
@Test
@@ -133,8 +133,8 @@ public class TableQueryQuotaManagerTest {
throws Exception {
ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
- _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
- Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+ _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
}
@Test
@@ -151,12 +151,12 @@ public class TableQueryQuotaManagerTest {
ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
- _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
- Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+ _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
// Nothing happened since it doesn't have qps quota.
- _tableQueryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
- Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+ _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
}
@Test
@@ -174,12 +174,12 @@ public class TableQueryQuotaManagerTest {
ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME);
TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
- _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
- Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+ _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
// Drop the offline table won't have any affect since it is table type specific.
- _tableQueryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
- Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+ _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
}
@Test
@@ -210,22 +210,22 @@ public class TableQueryQuotaManagerTest {
ZKMetadataProvider.setOfflineTableConfig(_testPropertyStore, OFFLINE_TABLE_NAME, offlineTableConfig.toZNRecord());
// Since each table has 2 online brokers, per broker rate becomes 100.0 / 2 = 50.0
- _tableQueryQuotaManager.initTableQueryQuota(offlineTableConfig, brokerResource);
- Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 1);
- _tableQueryQuotaManager.initTableQueryQuota(realtimeTableConfig, brokerResource);
+ _queryQuotaManager.initTableQueryQuota(offlineTableConfig, brokerResource);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
+ _queryQuotaManager.initTableQueryQuota(realtimeTableConfig, brokerResource);
// The hash map now contains 2 entries for both of the tables.
- Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 2);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 2);
// Rate limiter generates 1 token every 10 milliseconds, have to make it sleep for a while.
runQueries(70, 10L);
- _tableQueryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
+ _queryQuotaManager.dropTableQueryQuota(OFFLINE_TABLE_NAME);
// Since real-time table still has the qps quota, the size of the hash map becomes 1.
- Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 1);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
- _tableQueryQuotaManager.dropTableQueryQuota(REALTIME_TABLE_NAME);
+ _queryQuotaManager.dropTableQueryQuota(REALTIME_TABLE_NAME);
// Since the only 1 table which has qps quota has been dropped, the size of the hash map becomes 0.
- Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
}
@Test
@@ -234,13 +234,13 @@ public class TableQueryQuotaManagerTest {
ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME);
TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME);
setQps(tableConfig);
- _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
- Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 1);
+ _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
runQueries(70, 10L);
- _tableQueryQuotaManager.dropTableQueryQuota(REALTIME_TABLE_NAME);
- Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+ _queryQuotaManager.dropTableQueryQuota(REALTIME_TABLE_NAME);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
}
@Test
@@ -248,8 +248,8 @@ public class TableQueryQuotaManagerTest {
throws Exception {
ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME);
TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME);
- _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
- Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+ _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
}
@Test
@@ -265,8 +265,8 @@ public class TableQueryQuotaManagerTest {
ExternalView brokerResource = generateBrokerResource(REALTIME_TABLE_NAME);
TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME);
- _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
- Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+ _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
}
@Test
@@ -283,8 +283,8 @@ public class TableQueryQuotaManagerTest {
ExternalView brokerResource = generateBrokerResource(OFFLINE_TABLE_NAME);
TableConfig tableConfig = generateDefaultTableConfig(REALTIME_TABLE_NAME);
- _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
- Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+ _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
}
@Test
@@ -296,8 +296,8 @@ public class TableQueryQuotaManagerTest {
QuotaConfig quotaConfig = new QuotaConfig();
quotaConfig.setMaxQueriesPerSecond("InvalidQpsQuota");
tableConfig.setQuotaConfig(quotaConfig);
- _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
- Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+ _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
}
@Test
@@ -309,8 +309,8 @@ public class TableQueryQuotaManagerTest {
QuotaConfig quotaConfig = new QuotaConfig();
quotaConfig.setMaxQueriesPerSecond("-1.0");
tableConfig.setQuotaConfig(quotaConfig);
- _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
- Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+ _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
}
@Test
@@ -318,8 +318,8 @@ public class TableQueryQuotaManagerTest {
throws Exception {
TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
setQps(tableConfig);
- _tableQueryQuotaManager.initTableQueryQuota(tableConfig, null);
- Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 0);
+ _queryQuotaManager.initTableQueryQuota(tableConfig, null);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 0);
}
@Test
@@ -328,8 +328,8 @@ public class TableQueryQuotaManagerTest {
ExternalView brokerResource = new ExternalView(BROKER_RESOURCE_INSTANCE);
TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
setQps(tableConfig);
- _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
- Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 1);
+ _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
}
@Test
@@ -339,11 +339,11 @@ public class TableQueryQuotaManagerTest {
brokerResource.setState(OFFLINE_TABLE_NAME, "broker_instance_2", "OFFLINE");
TableConfig tableConfig = generateDefaultTableConfig(OFFLINE_TABLE_NAME);
setQps(tableConfig);
- _tableQueryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
+ _queryQuotaManager.initTableQueryQuota(tableConfig, brokerResource);
// For the 1st version we don't check the number of online brokers.
// Thus the expected size now is 1. It'll be 0 when we bring dynamic rate back.
- Assert.assertEquals(_tableQueryQuotaManager.getRateLimiterMapSize(), 1);
+ Assert.assertEquals(_queryQuotaManager.getRateLimiterMapSize(), 1);
}
private TableConfig generateDefaultTableConfig(String tableName) {
@@ -370,7 +370,7 @@ public class TableQueryQuotaManagerTest {
throws InterruptedException {
int count = 0;
for (int i = 0; i < numOfTimesToRun; i++) {
- Assert.assertTrue(_tableQueryQuotaManager.acquire(RAW_TABLE_NAME));
+ Assert.assertTrue(_queryQuotaManager.acquire(RAW_TABLE_NAME));
count++;
Thread.sleep(millis);
}
@@ -380,7 +380,7 @@ public class TableQueryQuotaManagerTest {
count = 0;
millis /= 2;
for (int i = 0; i < numOfTimesToRun; i++) {
- if (!_tableQueryQuotaManager.acquire(RAW_TABLE_NAME)) {
+ if (!_queryQuotaManager.acquire(RAW_TABLE_NAME)) {
count++;
}
Thread.sleep(millis);
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 8ed7999..fee6962 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -111,6 +111,18 @@ public class CommonConstants {
}
public static class Broker {
+ public static final String ROUTING_TABLE_CONFIG_PREFIX = "pinot.broker.routing.table";
+ public static final String ACCESS_CONTROL_CONFIG_PREFIX = "pinot.broker.access.control";
+ public static final String METRICS_CONFIG_PREFIX = "pinot.broker.metrics";
+
+ public static final String CONFIG_OF_DELAY_SHUTDOWN_TIME_MS = "pinot.broker.delayShutdownTimeMs";
+ public static final long DEFAULT_DELAY_SHUTDOWN_TIME_MS = 10_000L;
+ public static final String CONFIG_OF_ENABLE_TABLE_LEVEL_METRICS = "pinot.broker.enableTableLevelMetrics";
+ public static final String CONFIG_OF_REQUEST_HANDLER_TYPE = "pinot.broker.requestHandlerType";
+ public static final String SINGLE_CONNECTION_REQUEST_HANDLER_TYPE = "singleConnection";
+ public static final String CONNECTION_POOL_REQUEST_HANDLER_TYPE = "connectionPool";
+ public static final String DEFAULT_REQUEST_HANDLER_TYPE = SINGLE_CONNECTION_REQUEST_HANDLER_TYPE;
+
public static final String CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT = "pinot.broker.query.response.limit";
public static final int DEFAULT_BROKER_QUERY_RESPONSE_LIMIT = Integer.MAX_VALUE;
public static final String CONFIG_OF_BROKER_QUERY_LOG_LENGTH = "pinot.broker.query.log.length";
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index d671311..f0580a0 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -44,7 +44,6 @@ import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.http.HttpStatus;
-import org.apache.pinot.broker.broker.BrokerServerBuilder;
import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
import org.apache.pinot.common.config.IndexingConfig;
import org.apache.pinot.common.config.TableConfig;
@@ -120,11 +119,12 @@ public abstract class ClusterTest extends ControllerTest {
Configuration brokerConf = new BaseConfiguration();
brokerConf.setProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 60 * 1000L);
brokerConf.setProperty(Helix.KEY_OF_BROKER_QUERY_PORT, Integer.toString(basePort + i));
- brokerConf.setProperty(BrokerServerBuilder.DELAY_SHUTDOWN_TIME_MS_CONFIG, 0);
+ brokerConf.setProperty(Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, 0);
// Randomly choose to use connection-pool or single-connection request handler
if (RANDOM.nextBoolean()) {
- brokerConf.setProperty(BrokerServerBuilder.REQUEST_HANDLER_TYPE_CONFIG,
- BrokerServerBuilder.SINGLE_CONNECTION_REQUEST_HANDLER_TYPE);
+ brokerConf.setProperty(Broker.CONFIG_OF_REQUEST_HANDLER_TYPE, Broker.CONNECTION_POOL_REQUEST_HANDLER_TYPE);
+ } else {
+ brokerConf.setProperty(Broker.CONFIG_OF_REQUEST_HANDLER_TYPE, Broker.SINGLE_CONNECTION_REQUEST_HANDLER_TYPE);
}
overrideBrokerConf(brokerConf);
HelixBrokerStarter brokerStarter = new HelixBrokerStarter(brokerConf, _clusterName, zkStr);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org