You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2022/12/11 19:54:44 UTC
[pinot] 01/01: Move brokerId extraction to BaseBrokerStarter
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch move-brokerId-extraction-to-BaseBrokerStarter
in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 892ce9baf841a4412663d760c3bf880255b91574
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Sun Dec 11 11:54:27 2022 -0800
Move brokerId extraction to BaseBrokerStarter
---
.../broker/broker/helix/BaseBrokerStarter.java | 21 ++++++++++++++++-----
.../requesthandler/BaseBrokerRequestHandler.java | 13 ++-----------
.../BrokerRequestHandlerDelegate.java | 5 ++++-
.../requesthandler/GrpcBrokerRequestHandler.java | 4 ++--
.../MultiStageBrokerRequestHandler.java | 11 ++++++-----
.../SingleConnectionBrokerRequestHandler.java | 10 +++++-----
.../BaseBrokerRequestHandlerTest.java | 2 +-
.../LiteralOnlyBrokerRequestTest.java | 18 +++++++++---------
8 files changed, 45 insertions(+), 39 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 694e6fde1c..ab35409881 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -20,6 +20,7 @@ package org.apache.pinot.broker.broker.helix;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -264,21 +265,22 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
NettyConfig nettyDefaults = NettyConfig.extractNettyConfig(_brokerConf, Broker.BROKER_NETTY_PREFIX);
// Create Broker request handler.
+ String brokerId = _brokerConf.getProperty(Broker.CONFIG_OF_BROKER_ID, getDefaultBrokerId());
String brokerRequestHandlerType =
_brokerConf.getProperty(Broker.BROKER_REQUEST_HANDLER_TYPE, Broker.DEFAULT_BROKER_REQUEST_HANDLER_TYPE);
BrokerRequestHandler singleStageBrokerRequestHandler = null;
if (brokerRequestHandlerType.equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE)) {
singleStageBrokerRequestHandler =
- new GrpcBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory, queryQuotaManager,
+ new GrpcBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory, queryQuotaManager,
tableCache, _brokerMetrics, null);
} else { // default request handler type, e.g. netty
if (_brokerConf.getProperty(Broker.BROKER_NETTYTLS_ENABLED, false)) {
singleStageBrokerRequestHandler =
- new SingleConnectionBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory,
+ new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, tlsDefaults, _serverRoutingStatsManager);
} else {
singleStageBrokerRequestHandler =
- new SingleConnectionBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory,
+ new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, null, _serverRoutingStatsManager);
}
}
@@ -289,11 +291,11 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
// worker requires both the "Netty port" for protocol transport; and "GRPC port" for mailbox transport.
// TODO: decouple protocol and engine selection.
multiStageBrokerRequestHandler =
- new MultiStageBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory, queryQuotaManager,
+ new MultiStageBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory, queryQuotaManager,
tableCache, _brokerMetrics);
}
- _brokerRequestHandler = new BrokerRequestHandlerDelegate(singleStageBrokerRequestHandler,
+ _brokerRequestHandler = new BrokerRequestHandlerDelegate(brokerId, singleStageBrokerRequestHandler,
multiStageBrokerRequestHandler, _brokerMetrics);
_brokerRequestHandler.start();
String controllerUrl = _brokerConf.getProperty(Broker.CONTROLLER_URL);
@@ -433,6 +435,15 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
new ServiceStatus.LifecycleServiceStatusCallback(this::isStarting, this::isShuttingDown))));
}
+ private String getDefaultBrokerId() {
+ try {
+ return InetAddress.getLocalHost().getHostName();
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while getting default broker Id", e);
+ return "";
+ }
+ }
+
@Override
public void stop() {
LOGGER.info("Shutting down Pinot broker");
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index ae88689a51..ba2b0e8dac 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -129,9 +129,10 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
private final boolean _enableDistinctCountBitmapOverride;
private final Map<Long, QueryServers> _queriesById;
- public BaseBrokerRequestHandler(PinotConfiguration config, BrokerRoutingManager routingManager,
+ public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
BrokerMetrics brokerMetrics) {
+ _brokerId = brokerId;
_config = config;
_routingManager = routingManager;
_accessControlFactory = accessControlFactory;
@@ -146,7 +147,6 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
_enableDistinctCountBitmapOverride =
_config.getProperty(CommonConstants.Helix.ENABLE_DISTINCT_COUNT_BITMAP_OVERRIDE_KEY, false);
- _brokerId = config.getProperty(Broker.CONFIG_OF_BROKER_ID, getDefaultBrokerId());
_brokerTimeoutMs = config.getProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, Broker.DEFAULT_BROKER_TIMEOUT_MS);
_queryResponseLimit =
config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT, Broker.DEFAULT_BROKER_QUERY_RESPONSE_LIMIT);
@@ -160,15 +160,6 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
_queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit(), enableQueryCancellation);
}
- private String getDefaultBrokerId() {
- try {
- return InetAddress.getLocalHost().getHostName();
- } catch (Exception e) {
- LOGGER.error("Caught exception while getting default broker Id", e);
- return "";
- }
- }
-
@Override
public Map<Long, String> getRunningQueries() {
Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker");
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
index c6eee5f04f..3e6a0598be 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
@@ -49,9 +49,11 @@ public class BrokerRequestHandlerDelegate implements BrokerRequestHandler {
private final BrokerRequestHandler _singleStageBrokerRequestHandler;
private final BrokerRequestHandler _multiStageWorkerRequestHandler;
private final BrokerMetrics _brokerMetrics;
+ private final String _brokerId;
- public BrokerRequestHandlerDelegate(BrokerRequestHandler singleStageBrokerRequestHandler,
+ public BrokerRequestHandlerDelegate(String brokerId, BrokerRequestHandler singleStageBrokerRequestHandler,
@Nullable BrokerRequestHandler multiStageWorkerRequestHandler, BrokerMetrics brokerMetrics) {
+ _brokerId = brokerId;
_singleStageBrokerRequestHandler = singleStageBrokerRequestHandler;
_multiStageWorkerRequestHandler = multiStageWorkerRequestHandler;
_brokerMetrics = brokerMetrics;
@@ -81,6 +83,7 @@ public class BrokerRequestHandlerDelegate implements BrokerRequestHandler {
public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOptions sqlNodeAndOptions,
@Nullable RequesterIdentity requesterIdentity, RequestContext requestContext)
throws Exception {
+ requestContext.setBrokerId(_brokerId);
if (sqlNodeAndOptions == null) {
try {
sqlNodeAndOptions = RequestUtils.parseQuery(request.get(CommonConstants.Broker.Request.SQL).asText(), request);
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
index 2af8027668..95d17c955b 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
@@ -59,10 +59,10 @@ public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler {
private final PinotStreamingQueryClient _streamingQueryClient;
// TODO: Support TLS
- public GrpcBrokerRequestHandler(PinotConfiguration config, BrokerRoutingManager routingManager,
+ public GrpcBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
BrokerMetrics brokerMetrics, TlsConfig tlsConfig) {
- super(config, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics);
+ super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics);
LOGGER.info("Using Grpc BrokerRequestHandler.");
_grpcConfig = GrpcConfig.buildGrpcQueryConfig(config);
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 6cdd68cd80..ee83a745b3 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -72,15 +72,16 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
private final QueryEnvironment _queryEnvironment;
private final QueryDispatcher _queryDispatcher;
- public MultiStageBrokerRequestHandler(PinotConfiguration config, BrokerRoutingManager routingManager,
- AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
- BrokerMetrics brokerMetrics) {
- super(config, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics);
+ public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerIdFromConfig,
+ BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory,
+ QueryQuotaManager queryQuotaManager, TableCache tableCache, BrokerMetrics brokerMetrics) {
+ super(config, brokerIdFromConfig, routingManager, accessControlFactory, queryQuotaManager, tableCache,
+ brokerMetrics);
LOGGER.info("Using Multi-stage BrokerRequestHandler.");
String reducerHostname = config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME);
if (reducerHostname == null) {
// use broker ID as host name, but remove the
- String brokerId = config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID);
+ String brokerId = brokerIdFromConfig;
brokerId = brokerId.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE) ? brokerId.substring(
CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : brokerId;
brokerId = StringUtils.split(brokerId, "_").length > 1 ? StringUtils.split(brokerId, "_")[0] : brokerId;
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index c34b8dda6a..d78aa5ffe8 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -69,11 +69,11 @@ public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandl
private final QueryRouter _queryRouter;
private final FailureDetector _failureDetector;
- public SingleConnectionBrokerRequestHandler(PinotConfiguration config, BrokerRoutingManager routingManager,
- AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
- BrokerMetrics brokerMetrics, NettyConfig nettyConfig, TlsConfig tlsConfig,
- ServerRoutingStatsManager serverRoutingStatsManager) {
- super(config, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics);
+ public SingleConnectionBrokerRequestHandler(PinotConfiguration config, String brokerId,
+ BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory,
+ QueryQuotaManager queryQuotaManager, TableCache tableCache, BrokerMetrics brokerMetrics, NettyConfig nettyConfig,
+ TlsConfig tlsConfig, ServerRoutingStatsManager serverRoutingStatsManager) {
+ super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics);
LOGGER.info("Using Netty BrokerRequestHandler.");
_brokerReduceService = new BrokerReduceService(_config);
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
index 88f72600e5..d2ea5c9b7a 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
@@ -206,7 +206,7 @@ public class BaseBrokerRequestHandlerTest {
PinotConfiguration config =
new PinotConfiguration(Collections.singletonMap("pinot.broker.enable.query.cancellation", "true"));
BaseBrokerRequestHandler requestHandler =
- new BaseBrokerRequestHandler(config, routingManager, new AllowAllAccessControlFactory(),
+ new BaseBrokerRequestHandler(config, null, routingManager, new AllowAllAccessControlFactory(),
queryQuotaManager, tableCache,
new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet())) {
@Override
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
index 389db74fb6..a9ee2a5dee 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
@@ -181,9 +181,9 @@ public class LiteralOnlyBrokerRequestTest {
public void testBrokerRequestHandler()
throws Exception {
SingleConnectionBrokerRequestHandler requestHandler =
- new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, ACCESS_CONTROL_FACTORY, null, null,
- new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()),
- null, null, mock(ServerRoutingStatsManager.class));
+ new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, null, ACCESS_CONTROL_FACTORY, null,
+ null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), null,
+ null, mock(ServerRoutingStatsManager.class));
long randNum = RANDOM.nextLong();
byte[] randBytes = new byte[12];
@@ -209,9 +209,9 @@ public class LiteralOnlyBrokerRequestTest {
public void testBrokerRequestHandlerWithAsFunction()
throws Exception {
SingleConnectionBrokerRequestHandler requestHandler =
- new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, ACCESS_CONTROL_FACTORY, null, null,
- new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()),
- null, null, mock(ServerRoutingStatsManager.class));
+ new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, null, ACCESS_CONTROL_FACTORY, null,
+ null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), null,
+ null, mock(ServerRoutingStatsManager.class));
long currentTsMin = System.currentTimeMillis();
JsonNode request = JsonUtils.stringToJsonNode(
"{\"sql\":\"SELECT now() as currentTs, fromDateTime('2020-01-01 UTC', 'yyyy-MM-dd z') as firstDayOf2020\"}");
@@ -416,9 +416,9 @@ public class LiteralOnlyBrokerRequestTest {
public void testExplainPlanLiteralOnly()
throws Exception {
SingleConnectionBrokerRequestHandler requestHandler =
- new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, ACCESS_CONTROL_FACTORY, null, null,
- new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()),
- null, null, mock(ServerRoutingStatsManager.class));
+ new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, null, ACCESS_CONTROL_FACTORY, null,
+ null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), null,
+ null, mock(ServerRoutingStatsManager.class));
// Test 1: select constant
JsonNode request = JsonUtils.stringToJsonNode("{\"sql\":\"EXPLAIN PLAN FOR SELECT 1.5, 'test'\"}");
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org