You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2022/12/11 19:54:44 UTC

[pinot] 01/01: Move brokerId extraction to BaseBrokerStarter

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch move-brokerId-extraction-to-BaseBrokerStarter
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 892ce9baf841a4412663d760c3bf880255b91574
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Sun Dec 11 11:54:27 2022 -0800

    Move brokerId extraction to BaseBrokerStarter
---
 .../broker/broker/helix/BaseBrokerStarter.java      | 21 ++++++++++++++++-----
 .../requesthandler/BaseBrokerRequestHandler.java    | 13 ++-----------
 .../BrokerRequestHandlerDelegate.java               |  5 ++++-
 .../requesthandler/GrpcBrokerRequestHandler.java    |  4 ++--
 .../MultiStageBrokerRequestHandler.java             | 11 ++++++-----
 .../SingleConnectionBrokerRequestHandler.java       | 10 +++++-----
 .../BaseBrokerRequestHandlerTest.java               |  2 +-
 .../LiteralOnlyBrokerRequestTest.java               | 18 +++++++++---------
 8 files changed, 45 insertions(+), 39 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 694e6fde1c..ab35409881 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -20,6 +20,7 @@ package org.apache.pinot.broker.broker.helix;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -264,21 +265,22 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
     NettyConfig nettyDefaults = NettyConfig.extractNettyConfig(_brokerConf, Broker.BROKER_NETTY_PREFIX);
 
     // Create Broker request handler.
+    String brokerId = _brokerConf.getProperty(Broker.CONFIG_OF_BROKER_ID, getDefaultBrokerId());
     String brokerRequestHandlerType =
         _brokerConf.getProperty(Broker.BROKER_REQUEST_HANDLER_TYPE, Broker.DEFAULT_BROKER_REQUEST_HANDLER_TYPE);
     BrokerRequestHandler singleStageBrokerRequestHandler = null;
     if (brokerRequestHandlerType.equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE)) {
       singleStageBrokerRequestHandler =
-          new GrpcBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory, queryQuotaManager,
+          new GrpcBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory, queryQuotaManager,
               tableCache, _brokerMetrics, null);
     } else { // default request handler type, e.g. netty
       if (_brokerConf.getProperty(Broker.BROKER_NETTYTLS_ENABLED, false)) {
         singleStageBrokerRequestHandler =
-            new SingleConnectionBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory,
+            new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
                 queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, tlsDefaults, _serverRoutingStatsManager);
       } else {
         singleStageBrokerRequestHandler =
-            new SingleConnectionBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory,
+            new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
                 queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, null, _serverRoutingStatsManager);
       }
     }
@@ -289,11 +291,11 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
       // worker requires both the "Netty port" for protocol transport; and "GRPC port" for mailbox transport.
       // TODO: decouple protocol and engine selection.
       multiStageBrokerRequestHandler =
-          new MultiStageBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory, queryQuotaManager,
+          new MultiStageBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory, queryQuotaManager,
               tableCache, _brokerMetrics);
     }
 
-    _brokerRequestHandler = new BrokerRequestHandlerDelegate(singleStageBrokerRequestHandler,
+    _brokerRequestHandler = new BrokerRequestHandlerDelegate(brokerId, singleStageBrokerRequestHandler,
         multiStageBrokerRequestHandler, _brokerMetrics);
     _brokerRequestHandler.start();
     String controllerUrl = _brokerConf.getProperty(Broker.CONTROLLER_URL);
@@ -433,6 +435,15 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
             new ServiceStatus.LifecycleServiceStatusCallback(this::isStarting, this::isShuttingDown))));
   }
 
+  private String getDefaultBrokerId() {
+    try {
+      return InetAddress.getLocalHost().getHostName();
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while getting default broker Id", e);
+      return "";
+    }
+  }
+
   @Override
   public void stop() {
     LOGGER.info("Shutting down Pinot broker");
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index ae88689a51..ba2b0e8dac 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -129,9 +129,10 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
   private final boolean _enableDistinctCountBitmapOverride;
   private final Map<Long, QueryServers> _queriesById;
 
-  public BaseBrokerRequestHandler(PinotConfiguration config, BrokerRoutingManager routingManager,
+  public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager,
       AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
       BrokerMetrics brokerMetrics) {
+    _brokerId = brokerId;
     _config = config;
     _routingManager = routingManager;
     _accessControlFactory = accessControlFactory;
@@ -146,7 +147,6 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     _enableDistinctCountBitmapOverride =
         _config.getProperty(CommonConstants.Helix.ENABLE_DISTINCT_COUNT_BITMAP_OVERRIDE_KEY, false);
 
-    _brokerId = config.getProperty(Broker.CONFIG_OF_BROKER_ID, getDefaultBrokerId());
     _brokerTimeoutMs = config.getProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, Broker.DEFAULT_BROKER_TIMEOUT_MS);
     _queryResponseLimit =
         config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT, Broker.DEFAULT_BROKER_QUERY_RESPONSE_LIMIT);
@@ -160,15 +160,6 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
         _queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit(), enableQueryCancellation);
   }
 
-  private String getDefaultBrokerId() {
-    try {
-      return InetAddress.getLocalHost().getHostName();
-    } catch (Exception e) {
-      LOGGER.error("Caught exception while getting default broker Id", e);
-      return "";
-    }
-  }
-
   @Override
   public Map<Long, String> getRunningQueries() {
     Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker");
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
index c6eee5f04f..3e6a0598be 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
@@ -49,9 +49,11 @@ public class BrokerRequestHandlerDelegate implements BrokerRequestHandler {
   private final BrokerRequestHandler _singleStageBrokerRequestHandler;
   private final BrokerRequestHandler _multiStageWorkerRequestHandler;
   private final BrokerMetrics _brokerMetrics;
+  private final String _brokerId;
 
-  public BrokerRequestHandlerDelegate(BrokerRequestHandler singleStageBrokerRequestHandler,
+  public BrokerRequestHandlerDelegate(String brokerId, BrokerRequestHandler singleStageBrokerRequestHandler,
       @Nullable BrokerRequestHandler multiStageWorkerRequestHandler, BrokerMetrics brokerMetrics) {
+    _brokerId = brokerId;
     _singleStageBrokerRequestHandler = singleStageBrokerRequestHandler;
     _multiStageWorkerRequestHandler = multiStageWorkerRequestHandler;
     _brokerMetrics = brokerMetrics;
@@ -81,6 +83,7 @@ public class BrokerRequestHandlerDelegate implements BrokerRequestHandler {
   public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOptions sqlNodeAndOptions,
       @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext)
       throws Exception {
+    requestContext.setBrokerId(_brokerId);
     if (sqlNodeAndOptions == null) {
       try {
         sqlNodeAndOptions = RequestUtils.parseQuery(request.get(CommonConstants.Broker.Request.SQL).asText(), request);
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
index 2af8027668..95d17c955b 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
@@ -59,10 +59,10 @@ public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler {
   private final PinotStreamingQueryClient _streamingQueryClient;
 
   // TODO: Support TLS
-  public GrpcBrokerRequestHandler(PinotConfiguration config, BrokerRoutingManager routingManager,
+  public GrpcBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager,
       AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
       BrokerMetrics brokerMetrics, TlsConfig tlsConfig) {
-    super(config, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics);
+    super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics);
     LOGGER.info("Using Grpc BrokerRequestHandler.");
     _grpcConfig = GrpcConfig.buildGrpcQueryConfig(config);
 
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 6cdd68cd80..ee83a745b3 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -72,15 +72,16 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
   private final QueryEnvironment _queryEnvironment;
   private final QueryDispatcher _queryDispatcher;
 
-  public MultiStageBrokerRequestHandler(PinotConfiguration config, BrokerRoutingManager routingManager,
-      AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
-      BrokerMetrics brokerMetrics) {
-    super(config, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics);
+  public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerIdFromConfig,
+      BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory,
+      QueryQuotaManager queryQuotaManager, TableCache tableCache, BrokerMetrics brokerMetrics) {
+    super(config, brokerIdFromConfig, routingManager, accessControlFactory, queryQuotaManager, tableCache,
+        brokerMetrics);
     LOGGER.info("Using Multi-stage BrokerRequestHandler.");
     String reducerHostname = config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME);
     if (reducerHostname == null) {
       // use broker ID as host name, but remove the
-      String brokerId = config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID);
+      String brokerId = brokerIdFromConfig;
       brokerId = brokerId.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE) ? brokerId.substring(
           CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : brokerId;
       brokerId = StringUtils.split(brokerId, "_").length > 1 ? StringUtils.split(brokerId, "_")[0] : brokerId;
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index c34b8dda6a..d78aa5ffe8 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -69,11 +69,11 @@ public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandl
   private final QueryRouter _queryRouter;
   private final FailureDetector _failureDetector;
 
-  public SingleConnectionBrokerRequestHandler(PinotConfiguration config, BrokerRoutingManager routingManager,
-      AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
-      BrokerMetrics brokerMetrics, NettyConfig nettyConfig, TlsConfig tlsConfig,
-      ServerRoutingStatsManager serverRoutingStatsManager) {
-    super(config, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics);
+  public SingleConnectionBrokerRequestHandler(PinotConfiguration config, String brokerId,
+      BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory,
+      QueryQuotaManager queryQuotaManager, TableCache tableCache, BrokerMetrics brokerMetrics, NettyConfig nettyConfig,
+      TlsConfig tlsConfig, ServerRoutingStatsManager serverRoutingStatsManager) {
+    super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics);
     LOGGER.info("Using Netty BrokerRequestHandler.");
 
     _brokerReduceService = new BrokerReduceService(_config);
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
index 88f72600e5..d2ea5c9b7a 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
@@ -206,7 +206,7 @@ public class BaseBrokerRequestHandlerTest {
     PinotConfiguration config =
         new PinotConfiguration(Collections.singletonMap("pinot.broker.enable.query.cancellation", "true"));
     BaseBrokerRequestHandler requestHandler =
-        new BaseBrokerRequestHandler(config, routingManager, new AllowAllAccessControlFactory(),
+        new BaseBrokerRequestHandler(config, null, routingManager, new AllowAllAccessControlFactory(),
             queryQuotaManager, tableCache,
             new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet())) {
           @Override
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
index 389db74fb6..a9ee2a5dee 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
@@ -181,9 +181,9 @@ public class LiteralOnlyBrokerRequestTest {
   public void testBrokerRequestHandler()
       throws Exception {
     SingleConnectionBrokerRequestHandler requestHandler =
-        new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, ACCESS_CONTROL_FACTORY, null, null,
-            new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()),
-            null, null, mock(ServerRoutingStatsManager.class));
+        new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, null, ACCESS_CONTROL_FACTORY, null,
+            null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), null,
+            null, mock(ServerRoutingStatsManager.class));
 
     long randNum = RANDOM.nextLong();
     byte[] randBytes = new byte[12];
@@ -209,9 +209,9 @@ public class LiteralOnlyBrokerRequestTest {
   public void testBrokerRequestHandlerWithAsFunction()
       throws Exception {
     SingleConnectionBrokerRequestHandler requestHandler =
-        new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, ACCESS_CONTROL_FACTORY, null, null,
-            new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()),
-            null, null, mock(ServerRoutingStatsManager.class));
+        new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, null, ACCESS_CONTROL_FACTORY, null,
+            null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), null,
+            null, mock(ServerRoutingStatsManager.class));
     long currentTsMin = System.currentTimeMillis();
     JsonNode request = JsonUtils.stringToJsonNode(
         "{\"sql\":\"SELECT now() as currentTs, fromDateTime('2020-01-01 UTC', 'yyyy-MM-dd z') as firstDayOf2020\"}");
@@ -416,9 +416,9 @@ public class LiteralOnlyBrokerRequestTest {
   public void testExplainPlanLiteralOnly()
       throws Exception {
     SingleConnectionBrokerRequestHandler requestHandler =
-        new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, ACCESS_CONTROL_FACTORY, null, null,
-            new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()),
-            null, null, mock(ServerRoutingStatsManager.class));
+        new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, null, ACCESS_CONTROL_FACTORY, null,
+            null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), null,
+            null, mock(ServerRoutingStatsManager.class));
 
     // Test 1: select constant
     JsonNode request = JsonUtils.stringToJsonNode("{\"sql\":\"EXPLAIN PLAN FOR SELECT 1.5, 'test'\"}");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org