You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/04/12 04:36:21 UTC

[incubator-pinot] branch broker_config created (now 2ffd7d5)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a change to branch broker_config
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 2ffd7d5  Remove redundant default broker configurations

This branch includes the following new commits:

     new 2ffd7d5  Remove redundant default broker configurations

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Remove redundant default broker configurations

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch broker_config
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 2ffd7d532b878c93305041acf0749daf8280a94b
Author: Jackie (Xiaotian) Jiang <xa...@linkedin.com>
AuthorDate: Thu Apr 11 21:28:54 2019 -0700

    Remove redundant default broker configurations
    
    - Remove config based routing configs because the config based
      routing was removed long time ago
    - Remove HelixBrokerStarter.getZkAddressForBroker() because it
      does not apply to the current implementation
    - Replace config key strings with constants
    - Change timeout for default broker and broker in integration
      test to 60s
    - Replace PropertiesConfiguration with light weight
      BaseConfiguration if not read from config file
---
 .../broker/helix/DefaultHelixBrokerConfig.java     | 54 ---------------
 .../broker/broker/helix/HelixBrokerStarter.java    | 80 ++++++----------------
 .../pinot/broker/broker/BrokerTestUtils.java       | 51 --------------
 .../broker/broker/HelixBrokerStarterTest.java      | 33 +++++----
 .../broker/broker/HelixBrokerStarterUtilsTest.java | 63 -----------------
 .../pinot/integration/tests/ClusterTest.java       | 38 +++++-----
 .../tests/NewConfigApplyIntegrationTest.java       |  3 +-
 .../tools/admin/command/StartBrokerCommand.java    | 16 ++---
 .../pinot/tools/perf/PerfBenchmarkDriver.java      |  9 +--
 9 files changed, 72 insertions(+), 275 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/DefaultHelixBrokerConfig.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/DefaultHelixBrokerConfig.java
deleted file mode 100644
index e7ab072..0000000
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/DefaultHelixBrokerConfig.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.broker.broker.helix;
-
-import java.util.Iterator;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.PropertiesConfiguration;
-
-
-public class DefaultHelixBrokerConfig {
-
-  public static Configuration getDefaultBrokerConf() {
-    Configuration brokerConf = new PropertiesConfiguration();
-
-    // config based routing
-    brokerConf.addProperty("pinot.broker.transport.routingMode", "HELIX");
-
-    brokerConf.addProperty("pinot.broker.routing.table.builder.default.offline.class", "balanced");
-    brokerConf.addProperty("pinot.broker.routing.table.builder.default.offline.numOfRoutingTables", "10");
-    brokerConf.addProperty("pinot.broker.routing.table.builder.tables", "");
-
-    //client properties
-    brokerConf.addProperty("pinot.broker.client.queryPort", "8099");
-
-    return brokerConf;
-  }
-
-  public static Configuration getDefaultBrokerConf(Configuration externalConfigs) {
-    final Configuration defaultConfigs = getDefaultBrokerConf();
-    @SuppressWarnings("unchecked")
-    Iterator<String> iterable = externalConfigs.getKeys();
-    while (iterable.hasNext()) {
-      String key = iterable.next();
-      defaultConfigs.setProperty(key, externalConfigs.getProperty(key));
-    }
-    return defaultConfigs;
-  }
-}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
index 081d891..c54f27a 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
@@ -25,9 +25,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.commons.lang.StringUtils;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixConstants.ChangeType;
 import org.apache.helix.HelixDataAccessor;
@@ -51,7 +50,6 @@ import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.NetUtil;
 import org.apache.pinot.common.utils.ServiceStatus;
-import org.apache.pinot.common.utils.StringUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,14 +60,15 @@ import org.slf4j.LoggerFactory;
  *
  */
 public class HelixBrokerStarter {
-  private static final String PROPERTY_STORE = "PROPERTYSTORE";
+  private static final Logger LOGGER = LoggerFactory.getLogger(HelixBrokerStarter.class);
+  private static final String ROUTING_TABLE_PARAMS_SUBSET_KEY = "pinot.broker.routing.table";
 
   // Spectator Helix manager handles the custom change listeners, properties read/write
   private final HelixManager _spectatorHelixManager;
   // Participant Helix manager handles Helix functionality such as state transitions and messages
   private final HelixManager _participantHelixManager;
 
-  private final Configuration _pinotHelixProperties;
+  private final Configuration _brokerConf;
   private final HelixAdmin _helixAdmin;
   private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   private final HelixDataAccessor _helixDataAccessor;
@@ -84,31 +83,26 @@ public class HelixBrokerStarter {
   // Set after broker is started, which is actually in the constructor.
   private AccessControlFactory _accessControlFactory;
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(HelixBrokerStarter.class);
-
-  private static final String ROUTING_TABLE_PARAMS_SUBSET_KEY = "pinot.broker.routing.table";
-
-  public HelixBrokerStarter(String helixClusterName, String zkServer, Configuration pinotHelixProperties)
+  public HelixBrokerStarter(String helixClusterName, String zkServer, Configuration brokerConf)
       throws Exception {
-    this(null, helixClusterName, zkServer, pinotHelixProperties);
+    this(null, helixClusterName, zkServer, brokerConf);
   }
 
-  public HelixBrokerStarter(String brokerHost, String helixClusterName, String zkServer,
-      Configuration pinotHelixProperties)
+  public HelixBrokerStarter(String brokerHost, String helixClusterName, String zkServer, Configuration brokerConf)
       throws Exception {
     LOGGER.info("Starting Pinot broker");
 
-    _pinotHelixProperties = DefaultHelixBrokerConfig.getDefaultBrokerConf(pinotHelixProperties);
+    _brokerConf = brokerConf;
 
     if (brokerHost == null) {
       brokerHost = NetUtil.getHostAddress();
     }
 
-    final String brokerId = _pinotHelixProperties.getString(CommonConstants.Helix.Instance.INSTANCE_ID_KEY,
-        CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE + brokerHost + "_" + _pinotHelixProperties
+    String brokerId = _brokerConf.getString(CommonConstants.Helix.Instance.INSTANCE_ID_KEY,
+        CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE + brokerHost + "_" + _brokerConf
             .getInt(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT));
 
-    _pinotHelixProperties.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID, brokerId);
+    _brokerConf.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID, brokerId);
     setupHelixSystemProperties();
 
     // Remove all white-spaces from the list of zkServers (if any).
@@ -123,10 +117,10 @@ public class HelixBrokerStarter {
     _propertyStore = _spectatorHelixManager.getHelixPropertyStore();
     _helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor();
     _helixExternalViewBasedRouting = new HelixExternalViewBasedRouting(_propertyStore, _spectatorHelixManager,
-        pinotHelixProperties.subset(ROUTING_TABLE_PARAMS_SUBSET_KEY));
+        brokerConf.subset(ROUTING_TABLE_PARAMS_SUBSET_KEY));
     _tableQueryQuotaManager = new TableQueryQuotaManager(_spectatorHelixManager);
     _liveInstanceChangeHandler = new LiveInstanceChangeHandler(_spectatorHelixManager);
-    _brokerServerBuilder = startBroker(_pinotHelixProperties);
+    _brokerServerBuilder = startBroker(_brokerConf);
     _metricsRegistry = _brokerServerBuilder.getMetricsRegistry();
 
     // Initialize cluster change mediator
@@ -160,8 +154,8 @@ public class HelixBrokerStarter {
     stateMachineEngine
         .registerStateModelFactory(BrokerResourceOnlineOfflineStateModelFactory.getStateModelDef(), stateModelFactory);
     _participantHelixManager.connect();
-    _tbiMessageHandler = new TimeboundaryRefreshMessageHandlerFactory(_helixExternalViewBasedRouting,
-        _pinotHelixProperties.getLong(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL,
+    _tbiMessageHandler = new TimeboundaryRefreshMessageHandlerFactory(_helixExternalViewBasedRouting, _brokerConf
+        .getLong(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL,
             CommonConstants.Broker.DEFAULT_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL_MS));
     _participantHelixManager.getMessagingService()
         .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), _tbiMessageHandler);
@@ -169,7 +163,7 @@ public class HelixBrokerStarter {
     addInstanceTagIfNeeded(helixClusterName, brokerId);
 
     // Register the service status handler
-    double minResourcePercentForStartup = _pinotHelixProperties
+    double minResourcePercentForStartup = _brokerConf
         .getDouble(CommonConstants.Broker.CONFIG_OF_BROKER_MIN_RESOURCE_PERCENT_FOR_START,
             CommonConstants.Broker.DEFAULT_BROKER_MIN_RESOURCE_PERCENT_FOR_START);
     ServiceStatus.setServiceStatusCallback(new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList
@@ -189,7 +183,7 @@ public class HelixBrokerStarter {
     // NOTE: Helix will disconnect the manager and disable the instance if it detects flapping (too frequent disconnect
     // from ZooKeeper). Setting flapping time window to a small value can avoid this from happening. Helix ignores the
     // non-positive value, so set the default value as 1.
-    System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, _pinotHelixProperties
+    System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, _brokerConf
         .getString(CommonConstants.Helix.CONFIG_OF_BROKER_FLAPPING_TIME_WINDOW_MS,
             CommonConstants.Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS));
   }
@@ -209,9 +203,6 @@ public class HelixBrokerStarter {
   }
 
   private BrokerServerBuilder startBroker(Configuration config) {
-    if (config == null) {
-      config = DefaultHelixBrokerConfig.getDefaultBrokerConf();
-    }
     BrokerServerBuilder brokerServerBuilder = new BrokerServerBuilder(config, _helixExternalViewBasedRouting,
         _helixExternalViewBasedRouting.getTimeBoundaryService(), _liveInstanceChangeHandler, _tableQueryQuotaManager);
     _accessControlFactory = brokerServerBuilder.getAccessControlFactory();
@@ -278,32 +269,6 @@ public class HelixBrokerStarter {
     return _accessControlFactory;
   }
 
-  /**
-   * The zk string format should be 127.0.0.1:3000,127.0.0.1:3001/app/a which applies
-   * the /helixClusterName/PROPERTY_STORE after chroot to all servers.
-   * Expected output for this method is:
-   * 127.0.0.1:3000/app/a/helixClusterName/PROPERTY_STORE,127.0.0.1:3001/app/a/helixClusterName/PROPERTY_STORE
-   *
-   * @param zkServers
-   * @param helixClusterName
-   * @return the full property store path
-   *
-   * @see org.apache.zookeeper.ZooKeeper#ZooKeeper(String, int, org.apache.zookeeper.Watcher)
-   */
-  public static String getZkAddressForBroker(String zkServers, String helixClusterName) {
-    List tokens = new ArrayList<String>();
-    String[] zkSplit = zkServers.split("/", 2);
-    String zkHosts = zkSplit[0];
-    String zkPathSuffix = StringUtil.join("/", helixClusterName, PROPERTY_STORE);
-    if (zkSplit.length > 1) {
-      zkPathSuffix = zkSplit[1] + "/" + zkPathSuffix;
-    }
-    for (String token : zkHosts.split(",")) {
-      tokens.add(StringUtil.join("/", StringUtils.chomp(token, "/"), zkPathSuffix));
-    }
-    return StringUtils.join(tokens, ",");
-  }
-
   public HelixManager getSpectatorHelixManager() {
     return _spectatorHelixManager;
   }
@@ -318,14 +283,11 @@ public class HelixBrokerStarter {
 
   public static HelixBrokerStarter startDefault()
       throws Exception {
-    Configuration configuration = new PropertiesConfiguration();
+    Configuration brokerConf = new BaseConfiguration();
     int port = 5001;
-    configuration.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, port);
-    configuration.addProperty("pinot.broker.timeoutMs", 500 * 1000L);
-
-    final HelixBrokerStarter pinotHelixBrokerStarter =
-        new HelixBrokerStarter(null, "quickstart", "localhost:2122", configuration);
-    return pinotHelixBrokerStarter;
+    brokerConf.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, port);
+    brokerConf.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 60 * 1000L);
+    return new HelixBrokerStarter(null, "quickstart", "localhost:2122", brokerConf);
   }
 
   public void shutdown() {
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BrokerTestUtils.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BrokerTestUtils.java
deleted file mode 100644
index f863e56..0000000
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/BrokerTestUtils.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.broker.broker;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.pinot.broker.broker.helix.DefaultHelixBrokerConfig;
-import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
-
-
-/**
- * Utilities to start a broker during unit tests.
- *
- */
-public class BrokerTestUtils {
-  public static Configuration getDefaultBrokerConfiguration() {
-    return DefaultHelixBrokerConfig.getDefaultBrokerConf();
-  }
-
-  public static HelixBrokerStarter startBroker(final String clusterName, final String zkStr,
-      final Configuration configuration) {
-    try {
-      return new HelixBrokerStarter(clusterName, zkStr, configuration);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public static void stopBroker(final HelixBrokerStarter brokerStarter) {
-    try {
-      brokerStarter.getBrokerServerBuilder().stop();
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index 08387a8..b3a0452 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -29,10 +29,10 @@ import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.configuration.Configuration;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
-import org.apache.pinot.broker.broker.helix.DefaultHelixBrokerConfig;
 import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
 import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
 import org.apache.pinot.broker.routing.TimeBoundaryService;
@@ -57,7 +57,8 @@ public class HelixBrokerStarterTest extends ControllerTest {
   private static final String RAW_DINING_TABLE_NAME = "dining";
   private static final String DINING_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_DINING_TABLE_NAME);
   private static final String COFFEE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType("coffee");
-  private final Configuration _pinotHelixBrokerProperties = DefaultHelixBrokerConfig.getDefaultBrokerConf();
+
+  private final Configuration _brokerConf = new BaseConfiguration();
 
   private ZkClient _zkClient;
   private HelixBrokerStarter _helixBrokerStarter;
@@ -71,11 +72,9 @@ public class HelixBrokerStarterTest extends ControllerTest {
 
     startController();
 
-    _pinotHelixBrokerProperties.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, 8943);
-    _pinotHelixBrokerProperties
-        .addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL, 100L);
-    _helixBrokerStarter =
-        new HelixBrokerStarter(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, _pinotHelixBrokerProperties);
+    _brokerConf.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, 8943);
+    _brokerConf.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL, 100L);
+    _helixBrokerStarter = new HelixBrokerStarter(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, _brokerConf);
 
     ControllerRequestBuilderUtil
         .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, 5, true);
@@ -139,8 +138,10 @@ public class HelixBrokerStarterTest extends ControllerTest {
       throws Exception {
     IdealState idealState;
 
-    Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size(), 6);
-    idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size(),
+        6);
+    idealState =
+        _helixAdmin.getResourceIdealState(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
     Assert.assertEquals(idealState.getInstanceSet(DINING_TABLE_NAME).size(), SEGMENT_COUNT);
 
     ExternalView externalView =
@@ -173,8 +174,10 @@ public class HelixBrokerStarterTest extends ControllerTest {
         .setBrokerTenant("testBroker").setServerTenant("testServer").build();
     _helixResourceManager.addTable(tableConfig);
 
-    Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size(), 6);
-    idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size(),
+        6);
+    idealState =
+        _helixAdmin.getResourceIdealState(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
     Assert.assertEquals(idealState.getInstanceSet(COFFEE_TABLE_NAME).size(), SEGMENT_COUNT);
     Assert.assertEquals(idealState.getInstanceSet(DINING_TABLE_NAME).size(), SEGMENT_COUNT);
 
@@ -183,8 +186,9 @@ public class HelixBrokerStarterTest extends ControllerTest {
       @Override
       public Boolean call()
           throws Exception {
-        return _helixAdmin.getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
-            .getStateMap(COFFEE_TABLE_NAME).size() == SEGMENT_COUNT;
+        return
+            _helixAdmin.getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
+                .getStateMap(COFFEE_TABLE_NAME).size() == SEGMENT_COUNT;
       }
     }, 30000L);
 
@@ -273,8 +277,7 @@ public class HelixBrokerStarterTest extends ControllerTest {
       TimeBoundaryService.TimeBoundaryInfo timeBoundaryInfo = _helixBrokerStarter.getHelixExternalViewBasedRouting().
           getTimeBoundaryService().getTimeBoundaryInfoFor(DINING_TABLE_NAME);
       return currentTimeBoundary < Long.parseLong(timeBoundaryInfo.getTimeValue());
-    }, 5 * _pinotHelixBrokerProperties
-        .getLong(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL));
+    }, 5 * _brokerConf.getLong(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL));
     tbi = _helixBrokerStarter.getHelixExternalViewBasedRouting().
         getTimeBoundaryService().getTimeBoundaryInfoFor(DINING_TABLE_NAME);
     Assert.assertTrue(currentTimeBoundary < Long.parseLong(tbi.getTimeValue()));
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterUtilsTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterUtilsTest.java
deleted file mode 100644
index f080f0e..0000000
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterUtilsTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.broker.broker;
-
-import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-
-public class HelixBrokerStarterUtilsTest {
-
-  @Test
-  public void testZkParserUtil1() {
-    String zkServers = "hostname1,hostname2";
-    String zkAddressForBroker = HelixBrokerStarter.getZkAddressForBroker(zkServers, "helixClusterName");
-    String expectedZkAddressForBroker =
-        "hostname1/helixClusterName/PROPERTYSTORE,hostname2/helixClusterName/PROPERTYSTORE";
-    Assert.assertEquals(zkAddressForBroker, expectedZkAddressForBroker);
-  }
-
-  @Test
-  public void testZkParserUtil2() {
-    String zkServers = "hostname1,hostname2/chroot1/chroot2";
-    String zkAddressForBroker = HelixBrokerStarter.getZkAddressForBroker(zkServers, "helixClusterName");
-    String expectedZkAddressForBroker =
-        "hostname1/chroot1/chroot2/helixClusterName/PROPERTYSTORE,hostname2/chroot1/chroot2/helixClusterName/PROPERTYSTORE";
-    Assert.assertEquals(zkAddressForBroker, expectedZkAddressForBroker);
-  }
-
-  @Test
-  public void testZkParserUtil3() {
-    String zkServers = "hostname1:2181,hostname2:2181";
-    String zkAddressForBroker = HelixBrokerStarter.getZkAddressForBroker(zkServers, "helixClusterName");
-    String expectedZkAddressForBroker =
-        "hostname1:2181/helixClusterName/PROPERTYSTORE,hostname2:2181/helixClusterName/PROPERTYSTORE";
-    Assert.assertEquals(zkAddressForBroker, expectedZkAddressForBroker);
-  }
-
-  @Test
-  public void testZkParserUtil4() {
-    String zkServers = "hostname1:2181,hostname2:2181/chroot1/chroot2";
-    String zkAddressForBroker = HelixBrokerStarter.getZkAddressForBroker(zkServers, "helixClusterName");
-    String expectedZkAddressForBroker =
-        "hostname1:2181/chroot1/chroot2/helixClusterName/PROPERTYSTORE,hostname2:2181/chroot1/chroot2/helixClusterName/PROPERTYSTORE";
-    Assert.assertEquals(zkAddressForBroker, expectedZkAddressForBroker);
-  }
-}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index f677be3..5d67305 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -39,18 +39,19 @@ import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DecoderFactory;
+import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.io.FileUtils;
 import org.apache.http.HttpStatus;
 import org.apache.pinot.broker.broker.BrokerServerBuilder;
-import org.apache.pinot.broker.broker.BrokerTestUtils;
 import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
 import org.apache.pinot.common.config.IndexingConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.config.TableTaskConfig;
 import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.utils.CommonConstants.Broker;
 import org.apache.pinot.common.utils.CommonConstants.Helix;
 import org.apache.pinot.common.utils.CommonConstants.Minion;
 import org.apache.pinot.common.utils.CommonConstants.Server;
@@ -96,33 +97,36 @@ public abstract class ClusterTest extends ControllerTest {
   protected TableConfig _offlineTableConfig;
   protected TableConfig _realtimeTableConfig;
 
-  protected void startBroker() {
+  protected void startBroker()
+      throws Exception {
     startBrokers(1);
   }
 
-  protected void startBroker(int basePort, String zkStr) {
+  protected void startBroker(int basePort, String zkStr)
+      throws Exception {
     startBrokers(1, basePort, zkStr);
   }
 
-  protected void startBrokers(int numBrokers) {
+  protected void startBrokers(int numBrokers)
+      throws Exception {
     startBrokers(numBrokers, DEFAULT_BROKER_PORT, ZkStarter.DEFAULT_ZK_STR);
   }
 
-  protected void startBrokers(int numBrokers, int basePort, String zkStr) {
+  protected void startBrokers(int numBrokers, int basePort, String zkStr)
+      throws Exception {
     _brokerBaseApiUrl = "http://localhost:" + basePort;
     for (int i = 0; i < numBrokers; i++) {
-      Configuration configuration = BrokerTestUtils.getDefaultBrokerConfiguration();
-      configuration.setProperty("pinot.broker.timeoutMs", 100 * 1000L);
-      configuration.setProperty("pinot.broker.client.queryPort", Integer.toString(basePort + i));
-      configuration.setProperty("pinot.broker.routing.table.builder.class", "random");
-      configuration.setProperty(BrokerServerBuilder.DELAY_SHUTDOWN_TIME_MS_CONFIG, 0);
+      Configuration brokerConf = new BaseConfiguration();
+      brokerConf.setProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, 60 * 1000L);
+      brokerConf.setProperty(Helix.KEY_OF_BROKER_QUERY_PORT, Integer.toString(basePort + i));
+      brokerConf.setProperty(BrokerServerBuilder.DELAY_SHUTDOWN_TIME_MS_CONFIG, 0);
       // Randomly choose to use connection-pool or single-connection request handler
       if (RANDOM.nextBoolean()) {
-        configuration.setProperty(BrokerServerBuilder.REQUEST_HANDLER_TYPE_CONFIG,
+        brokerConf.setProperty(BrokerServerBuilder.REQUEST_HANDLER_TYPE_CONFIG,
             BrokerServerBuilder.SINGLE_CONNECTION_REQUEST_HANDLER_TYPE);
       }
-      overrideBrokerConf(configuration);
-      _brokerStarters.add(BrokerTestUtils.startBroker(_clusterName, zkStr, configuration));
+      overrideBrokerConf(brokerConf);
+      _brokerStarters.add(new HelixBrokerStarter(_clusterName, zkStr, brokerConf));
     }
   }
 
@@ -213,17 +217,13 @@ public abstract class ClusterTest extends ControllerTest {
     // Do nothing, to be overridden by tests if they need something specific
   }
 
-  protected void overrideBrokerConf(Configuration configuration) {
+  protected void overrideBrokerConf(Configuration brokerConf) {
     // Do nothing, to be overridden by tests if they need something specific
   }
 
   protected void stopBroker() {
     for (HelixBrokerStarter brokerStarter : _brokerStarters) {
-      try {
-        BrokerTestUtils.stopBroker(brokerStarter);
-      } catch (Exception e) {
-        LOGGER.error("Encountered exception while stopping broker {}", e.getMessage());
-      }
+      brokerStarter.shutdown();
     }
   }
 
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NewConfigApplyIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NewConfigApplyIntegrationTest.java
index 5fb52cc..8f0045d 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NewConfigApplyIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NewConfigApplyIntegrationTest.java
@@ -43,7 +43,8 @@ public class NewConfigApplyIntegrationTest extends BaseClusterIntegrationTest {
   private static final Logger LOGGER = LoggerFactory.getLogger(NewConfigApplyIntegrationTest.class);
 
   @BeforeClass
-  public void setUp() {
+  public void setUp()
+      throws Exception {
     // Start an empty cluster
     startZk();
     startController();
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java
index 60bcf2c..52b046f 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartBrokerCommand.java
@@ -19,8 +19,8 @@
 package org.apache.pinot.tools.admin.command;
 
 import java.io.File;
+import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.tools.Command;
@@ -107,23 +107,21 @@ public class StartBrokerCommand extends AbstractBaseAdminCommand implements Comm
   public boolean execute()
       throws Exception {
     try {
-      Configuration configuration = readConfigFromFile(_configFileName);
-      if (configuration == null) {
+      Configuration brokerConf = readConfigFromFile(_configFileName);
+      if (brokerConf == null) {
         if (_configFileName != null) {
           LOGGER.error("Error: Unable to find file {}.", _configFileName);
           return false;
         }
 
-        configuration = new PropertiesConfiguration();
-        configuration.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, _brokerPort);
-        configuration.setProperty("pinot.broker.routing.table.builder.class", "random");
+        brokerConf = new BaseConfiguration();
+        brokerConf.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, _brokerPort);
       }
 
       LOGGER.info("Executing command: " + toString());
-      final HelixBrokerStarter pinotHelixBrokerStarter =
-          new HelixBrokerStarter(_brokerHost, _clusterName, _zkAddress, configuration);
+      new HelixBrokerStarter(_brokerHost, _clusterName, _zkAddress, brokerConf);
 
-      String pidFile = ".pinotAdminBroker-" + String.valueOf(System.currentTimeMillis()) + ".pid";
+      String pidFile = ".pinotAdminBroker-" + System.currentTimeMillis() + ".pid";
       savePID(System.getProperty("java.io.tmpdir") + File.separator + pidFile);
       return true;
     } catch (Exception e) {
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
index c8a6005..f9ac81c 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
@@ -35,6 +35,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import javax.annotation.Nullable;
+import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
@@ -209,12 +210,12 @@ public class PerfBenchmarkDriver {
       LOGGER.info("Skipping start broker step. Assumes broker is already started.");
       return;
     }
-    Configuration brokerConfiguration = new PropertiesConfiguration();
+    Configuration brokerConf = new BaseConfiguration();
     String brokerInstanceName = "Broker_localhost_" + CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT;
-    brokerConfiguration.setProperty(CommonConstants.Helix.Instance.INSTANCE_ID_KEY, brokerInstanceName);
-    brokerConfiguration.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_TIMEOUT_MS, BROKER_TIMEOUT_MS);
+    brokerConf.setProperty(CommonConstants.Helix.Instance.INSTANCE_ID_KEY, brokerInstanceName);
+    brokerConf.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_TIMEOUT_MS, BROKER_TIMEOUT_MS);
     LOGGER.info("Starting broker instance: {}", brokerInstanceName);
-    new HelixBrokerStarter(_clusterName, _zkAddress, brokerConfiguration);
+    new HelixBrokerStarter(_clusterName, _zkAddress, brokerConf);
   }
 
   private void startServer()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org