You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2019/04/11 01:10:42 UTC

[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #4100: Refactor HelixBrokerStarter to separate constructor and start()

Jackie-Jiang commented on a change in pull request #4100: Refactor HelixBrokerStarter to separate constructor and start()
URL: https://github.com/apache/incubator-pinot/pull/4100#discussion_r274227551
 
 

 ##########
 File path: pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
 ##########
 @@ -47,189 +48,196 @@
 import org.apache.pinot.common.config.TagNameUtils;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.NetUtil;
 import org.apache.pinot.common.utils.ServiceStatus;
-import org.apache.pinot.common.utils.StringUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-/**
- * Helix Broker Startable
- *
- *
- */
 public class HelixBrokerStarter {
-  private static final String PROPERTY_STORE = "PROPERTYSTORE";
+  private static final Logger LOGGER = LoggerFactory.getLogger(HelixBrokerStarter.class);
+  private static final String ROUTING_TABLE_PARAMS_SUBSET_KEY = "pinot.broker.routing.table";
+
+  private final Configuration _properties;
+  private final String _clusterName;
+  private final String _zkServers;
+  private final String _brokerId;
 
   // Spectator Helix manager handles the custom change listeners, properties read/write
-  private final HelixManager _spectatorHelixManager;
+  private HelixManager _spectatorHelixManager;
   // Participant Helix manager handles Helix functionality such as state transitions and messages
-  private final HelixManager _participantHelixManager;
-
-  private final Configuration _pinotHelixProperties;
-  private final HelixAdmin _helixAdmin;
-  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
-  private final HelixDataAccessor _helixDataAccessor;
-  private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
-  private final BrokerServerBuilder _brokerServerBuilder;
-  private final LiveInstanceChangeHandler _liveInstanceChangeHandler;
-  private final MetricsRegistry _metricsRegistry;
-  private final TableQueryQuotaManager _tableQueryQuotaManager;
-  private final ClusterChangeMediator _clusterChangeMediator;
-  private final TimeboundaryRefreshMessageHandlerFactory _tbiMessageHandler;
-
-  // Set after broker is started, which is actually in the constructor.
-  private AccessControlFactory _accessControlFactory;
+  private HelixManager _participantHelixManager;
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(HelixBrokerStarter.class);
-
-  private static final String ROUTING_TABLE_PARAMS_SUBSET_KEY = "pinot.broker.routing.table";
+  private HelixAdmin _helixAdmin;
+  private ZkHelixPropertyStore<ZNRecord> _propertyStore;
+  private HelixDataAccessor _helixDataAccessor;
+  private HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
+  private BrokerServerBuilder _brokerServerBuilder;
+  private AccessControlFactory _accessControlFactory;
+  private MetricsRegistry _metricsRegistry;
+  private ClusterChangeMediator _clusterChangeMediator;
+  private TimeboundaryRefreshMessageHandlerFactory _tbiMessageHandler;
 
-  public HelixBrokerStarter(String helixClusterName, String zkServer, Configuration pinotHelixProperties)
+  public HelixBrokerStarter(Configuration properties, String clusterName, String zkServer)
       throws Exception {
-    this(null, helixClusterName, zkServer, pinotHelixProperties);
+    this(properties, clusterName, zkServer, null);
   }
 
-  public HelixBrokerStarter(String brokerHost, String helixClusterName, String zkServer,
-      Configuration pinotHelixProperties)
+  public HelixBrokerStarter(Configuration properties, String clusterName, String zkServer, @Nullable String brokerHost)
       throws Exception {
-    LOGGER.info("Starting Pinot broker");
+    _properties = properties;
+    setupHelixSystemProperties();
 
-    _pinotHelixProperties = DefaultHelixBrokerConfig.getDefaultBrokerConf(pinotHelixProperties);
+    _clusterName = clusterName;
+
+    // Remove all white-spaces from the list of zkServers (if any).
+    _zkServers = zkServer.replaceAll("\\s+", "");
 
     if (brokerHost == null) {
       brokerHost = NetUtil.getHostAddress();
     }
-
-    final String brokerId = _pinotHelixProperties.getString(CommonConstants.Helix.Instance.INSTANCE_ID_KEY,
-        CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE + brokerHost + "_" + _pinotHelixProperties
+    _brokerId = _properties.getString(CommonConstants.Helix.Instance.INSTANCE_ID_KEY,
+        CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE + brokerHost + "_" + _properties
             .getInt(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT));
+    _properties.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID, _brokerId);
+  }
 
-    _pinotHelixProperties.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID, brokerId);
-    setupHelixSystemProperties();
+  private void setupHelixSystemProperties() {
+    System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, _properties
+        .getString(CommonConstants.Broker.CONFIG_OF_HELIX_FLAPPING_TIME_WINDOW_MS,
+            CommonConstants.Broker.DEFAULT_HELIX_FLAPPING_TIME_WINDOW_MS));
+  }
 
-    // Remove all white-spaces from the list of zkServers (if any).
-    String zkServers = zkServer.replaceAll("\\s+", "");
+  public void start()
+      throws Exception {
+    LOGGER.info("Starting Pinot broker");
 
-    LOGGER.info("Connecting Helix components");
-    // Connect spectator Helix manager.
+    // Connect the spectator Helix manager
+    LOGGER.info("Connecting spectator Helix manager");
     _spectatorHelixManager =
-        HelixManagerFactory.getZKHelixManager(helixClusterName, brokerId, InstanceType.SPECTATOR, zkServers);
+        HelixManagerFactory.getZKHelixManager(_clusterName, _brokerId, InstanceType.SPECTATOR, _zkServers);
     _spectatorHelixManager.connect();
     _helixAdmin = _spectatorHelixManager.getClusterManagmentTool();
     _propertyStore = _spectatorHelixManager.getHelixPropertyStore();
     _helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor();
     _helixExternalViewBasedRouting = new HelixExternalViewBasedRouting(_propertyStore, _spectatorHelixManager,
-        pinotHelixProperties.subset(ROUTING_TABLE_PARAMS_SUBSET_KEY));
-    _tableQueryQuotaManager = new TableQueryQuotaManager(_spectatorHelixManager);
-    _liveInstanceChangeHandler = new LiveInstanceChangeHandler(_spectatorHelixManager);
-    _brokerServerBuilder = startBroker(_pinotHelixProperties);
+        _properties.subset(ROUTING_TABLE_PARAMS_SUBSET_KEY));
+    TableQueryQuotaManager tableQueryQuotaManager = new TableQueryQuotaManager(_spectatorHelixManager);
+    LiveInstanceChangeHandler liveInstanceChangeHandler = new LiveInstanceChangeHandler(_spectatorHelixManager);
+
+    // Set up the broker server builder
+    LOGGER.info("Setting up broker server builder");
+    _brokerServerBuilder = new BrokerServerBuilder(_properties, _helixExternalViewBasedRouting,
+        _helixExternalViewBasedRouting.getTimeBoundaryService(), liveInstanceChangeHandler, tableQueryQuotaManager);
+    _accessControlFactory = _brokerServerBuilder.getAccessControlFactory();
     _metricsRegistry = _brokerServerBuilder.getMetricsRegistry();
+    BrokerMetrics brokerMetrics = _brokerServerBuilder.getBrokerMetrics();
+    _helixExternalViewBasedRouting.setBrokerMetrics(brokerMetrics);
+    tableQueryQuotaManager.setBrokerMetrics(brokerMetrics);
 
 Review comment:
   We need to access HelixExternalViewBasedRouting but not TableQueryQuotaManager in other methods (solve IDE warnings lol).
   Actually I think it would be better to make all of them protected so the class extends this can access them

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org