You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/03/22 20:56:19 UTC

[incubator-pinot] branch master updated: Add controller mode logic in ControllerStarter (#3864)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c9fbd6a  Add controller mode logic in ControllerStarter (#3864)
c9fbd6a is described below

commit c9fbd6a11ca18c6497462f4698130d2735addc77
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Fri Mar 22 13:56:13 2019 -0700

    Add controller mode logic in ControllerStarter (#3864)
    
    * Add controller mode logic
    
    * Move controller mode out of resource manager
---
 .../broker/broker/HelixBrokerStarterTest.java      |  76 +++-----
 .../apache/pinot/common/utils/CommonConstants.java |   1 +
 .../apache/pinot/controller/ControllerConf.java    |  17 ++
 .../controller/ControllerLeadershipManager.java    |   2 +
 .../apache/pinot/controller/ControllerStarter.java | 205 ++++++++++++++++-----
 .../helix/core/PinotHelixResourceManager.java      |  32 +++-
 .../realtime/PinotLLCRealtimeSegmentManager.java   |   3 +
 .../core/realtime/SegmentCompletionManager.java    |   5 +
 .../rebalance/RebalanceSegmentStrategyFactory.java |   5 +
 .../PinotInstanceRestletResourceTest.java          |  20 +-
 .../controller/api/resources/TableViewsTest.java   |   2 +
 .../helix/ControllerPeriodicTaskStarterTest.java   |   8 +-
 .../pinot/controller/helix/ControllerTest.java     |  40 +++-
 .../controller/helix/PinotControllerModeTest.java  | 115 ++++++++++++
 .../controller/helix/PinotResourceManagerTest.java |  56 +++---
 .../sharding/SegmentAssignmentStrategyTest.java    |  88 ++++-----
 .../validation/ValidationManagerTest.java          |  57 +++---
 ...vertToRawIndexMinionClusterIntegrationTest.java |   4 +-
 .../tests/OfflineClusterIntegrationTest.java       |   3 +-
 .../admin/command/StartControllerCommand.java      |   7 +-
 20 files changed, 511 insertions(+), 235 deletions(-)

diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index a9a7568..08387a8 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.broker.broker;
 
 import com.google.common.util.concurrent.Uninterruptibles;
-import com.yammer.metrics.core.MetricsRegistry;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
@@ -31,10 +30,8 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.commons.configuration.Configuration;
-import org.apache.helix.HelixAdmin;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.broker.broker.helix.DefaultHelixBrokerConfig;
 import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
 import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
@@ -44,13 +41,10 @@ import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
-import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
-import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterTest;
@@ -58,17 +52,14 @@ import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
 
-public class HelixBrokerStarterTest {
+public class HelixBrokerStarterTest extends ControllerTest {
   private static final int SEGMENT_COUNT = 6;
-  private PinotHelixResourceManager _pinotResourceManager;
-  private static final String HELIX_CLUSTER_NAME = "TestHelixBrokerStarter";
   private static final String RAW_DINING_TABLE_NAME = "dining";
   private static final String DINING_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_DINING_TABLE_NAME);
   private static final String COFFEE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType("coffee");
   private final Configuration _pinotHelixBrokerProperties = DefaultHelixBrokerConfig.getDefaultBrokerConf();
 
   private ZkClient _zkClient;
-  private HelixAdmin _helixAdmin;
   private HelixBrokerStarter _helixBrokerStarter;
   private ZkStarter.ZookeeperInstance _zookeeperInstance;
 
@@ -77,43 +68,39 @@ public class HelixBrokerStarterTest {
       throws Exception {
     _zookeeperInstance = ZkStarter.startLocalZkServer();
     _zkClient = new ZkClient(ZkStarter.DEFAULT_ZK_STR);
-    final String instanceId = "localhost_helixController";
-    _pinotResourceManager =
-        new PinotHelixResourceManager(ZkStarter.DEFAULT_ZK_STR, HELIX_CLUSTER_NAME, instanceId, null, 10000L,
-            true, /*isUpdateStateModel=*/false, false);
-    _pinotResourceManager.start();
-    _helixAdmin = _pinotResourceManager.getHelixAdmin();
+
+    startController();
 
     _pinotHelixBrokerProperties.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, 8943);
     _pinotHelixBrokerProperties
         .addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL, 100L);
     _helixBrokerStarter =
-        new HelixBrokerStarter(HELIX_CLUSTER_NAME, ZkStarter.DEFAULT_ZK_STR, _pinotHelixBrokerProperties);
+        new HelixBrokerStarter(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, _pinotHelixBrokerProperties);
 
     ControllerRequestBuilderUtil
-        .addFakeBrokerInstancesToAutoJoinHelixCluster(HELIX_CLUSTER_NAME, ZkStarter.DEFAULT_ZK_STR, 5, true);
+        .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, 5, true);
     ControllerRequestBuilderUtil
-        .addFakeDataInstancesToAutoJoinHelixCluster(HELIX_CLUSTER_NAME, ZkStarter.DEFAULT_ZK_STR, 1, true);
+        .addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, 1, true);
 
-    while (_helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_OFFLINE").size() == 0
-        || _helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_BROKER").size() == 0) {
+    while (_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_OFFLINE").size() == 0
+        || _helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size() == 0) {
       Thread.sleep(100);
     }
 
     TableConfig offlineTableConfig =
         new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(RAW_DINING_TABLE_NAME)
             .setTimeColumnName("timeColumn").setTimeType("DAYS").build();
-    _pinotResourceManager.addTable(offlineTableConfig);
+    _helixResourceManager.addTable(offlineTableConfig);
     setupRealtimeTable();
 
     for (int i = 0; i < 5; i++) {
-      _pinotResourceManager
+      _helixResourceManager
           .addNewSegment(SegmentMetadataMockUtils.mockSegmentMetadata(RAW_DINING_TABLE_NAME), "downloadUrl");
     }
 
     Thread.sleep(1000);
 
-    ExternalView externalView = _helixAdmin.getResourceExternalView(HELIX_CLUSTER_NAME, DINING_TABLE_NAME);
+    ExternalView externalView = _helixAdmin.getResourceExternalView(getHelixClusterName(), DINING_TABLE_NAME);
     Assert.assertEquals(externalView.getPartitionSet().size(), 5);
   }
 
@@ -134,18 +121,15 @@ public class HelixBrokerStarterTest {
             setStreamConfigs(streamConfigs).build();
     Schema schema = new Schema();
     schema.setSchemaName(RAW_DINING_TABLE_NAME);
-    _pinotResourceManager.addOrUpdateSchema(schema);
-    // Fake an PinotLLCRealtimeSegmentManager instance: required for a realtime table creation.
-    PinotLLCRealtimeSegmentManager
-        .create(_pinotResourceManager, new ControllerConf(), new ControllerMetrics(new MetricsRegistry()));
-    _pinotResourceManager.addTable(realtimeTimeConfig);
+    _helixResourceManager.addOrUpdateSchema(schema);
+    _helixResourceManager.addTable(realtimeTimeConfig);
     _helixBrokerStarter.getHelixExternalViewBasedRouting()
-        .markDataResourceOnline(realtimeTimeConfig, null, new ArrayList<InstanceConfig>());
+        .markDataResourceOnline(realtimeTimeConfig, null, new ArrayList<>());
   }
 
   @AfterTest
   public void tearDown() {
-    _pinotResourceManager.stop();
+    _helixResourceManager.stop();
     _zkClient.close();
     ZkStarter.stopLocalZkServer(_zookeeperInstance);
   }
@@ -155,12 +139,12 @@ public class HelixBrokerStarterTest {
       throws Exception {
     IdealState idealState;
 
-    Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_BROKER").size(), 6);
-    idealState = _helixAdmin.getResourceIdealState(HELIX_CLUSTER_NAME, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size(), 6);
+    idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
     Assert.assertEquals(idealState.getInstanceSet(DINING_TABLE_NAME).size(), SEGMENT_COUNT);
 
     ExternalView externalView =
-        _helixAdmin.getResourceExternalView(HELIX_CLUSTER_NAME, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+        _helixAdmin.getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
     Assert.assertEquals(externalView.getStateMap(DINING_TABLE_NAME).size(), SEGMENT_COUNT);
 
     HelixExternalViewBasedRouting helixExternalViewBasedRouting =
@@ -187,10 +171,10 @@ public class HelixBrokerStarterTest {
     final String tableName = "coffee";
     TableConfig tableConfig = new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(tableName)
         .setBrokerTenant("testBroker").setServerTenant("testServer").build();
-    _pinotResourceManager.addTable(tableConfig);
+    _helixResourceManager.addTable(tableConfig);
 
-    Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_BROKER").size(), 6);
-    idealState = _helixAdmin.getResourceIdealState(HELIX_CLUSTER_NAME, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size(), 6);
+    idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
     Assert.assertEquals(idealState.getInstanceSet(COFFEE_TABLE_NAME).size(), SEGMENT_COUNT);
     Assert.assertEquals(idealState.getInstanceSet(DINING_TABLE_NAME).size(), SEGMENT_COUNT);
 
@@ -199,13 +183,13 @@ public class HelixBrokerStarterTest {
       @Override
       public Boolean call()
           throws Exception {
-        return _helixAdmin.getResourceExternalView(HELIX_CLUSTER_NAME, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
+        return _helixAdmin.getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
             .getStateMap(COFFEE_TABLE_NAME).size() == SEGMENT_COUNT;
       }
     }, 30000L);
 
     externalView =
-        _helixAdmin.getResourceExternalView(HELIX_CLUSTER_NAME, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+        _helixAdmin.getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
     Assert.assertEquals(externalView.getStateMap(COFFEE_TABLE_NAME).size(), SEGMENT_COUNT);
 
     // Wait up to 30s for routing table to reach the expected size
@@ -225,7 +209,7 @@ public class HelixBrokerStarterTest {
         brokerRoutingTableBuilderMap.get(DINING_TABLE_NAME).getRoutingTables().get(0).values().iterator().next().size(),
         5);
 
-    _pinotResourceManager
+    _helixResourceManager
         .addNewSegment(SegmentMetadataMockUtils.mockSegmentMetadata(RAW_DINING_TABLE_NAME), "downloadUrl");
 
     // Wait up to 30s for external view to reach the expected size
@@ -233,12 +217,12 @@ public class HelixBrokerStarterTest {
       @Override
       public Boolean call()
           throws Exception {
-        return _helixAdmin.getResourceExternalView(HELIX_CLUSTER_NAME, DINING_TABLE_NAME).getPartitionSet().size()
+        return _helixAdmin.getResourceExternalView(getHelixClusterName(), DINING_TABLE_NAME).getPartitionSet().size()
             == SEGMENT_COUNT;
       }
     }, 30000L);
 
-    externalView = _helixAdmin.getResourceExternalView(HELIX_CLUSTER_NAME, DINING_TABLE_NAME);
+    externalView = _helixAdmin.getResourceExternalView(getHelixClusterName(), DINING_TABLE_NAME);
     Assert.assertEquals(externalView.getPartitionSet().size(), SEGMENT_COUNT);
     tableArray = brokerRoutingTableBuilderMap.keySet().toArray();
     Arrays.sort(tableArray);
@@ -271,14 +255,14 @@ public class HelixBrokerStarterTest {
 
     Assert.assertEquals(tbi.getTimeValue(), Long.toString(currentTimeBoundary));
 
-    List<String> segmentNames = _pinotResourceManager.getSegmentsFor(DINING_TABLE_NAME);
+    List<String> segmentNames = _helixResourceManager.getSegmentsFor(DINING_TABLE_NAME);
     long endTime = currentTimeBoundary + 10;
     // Refresh all 5 segments.
     for (String segment : segmentNames) {
       OfflineSegmentZKMetadata offlineSegmentZKMetadata =
-          _pinotResourceManager.getOfflineSegmentZKMetadata(RAW_DINING_TABLE_NAME, segment);
+          _helixResourceManager.getOfflineSegmentZKMetadata(RAW_DINING_TABLE_NAME, segment);
       Assert.assertNotNull(offlineSegmentZKMetadata);
-      _pinotResourceManager.refreshSegment(
+      _helixResourceManager.refreshSegment(
           SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(RAW_DINING_TABLE_NAME, segment, endTime++),
           offlineSegmentZKMetadata);
     }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 0777ad4..cf09060 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -31,6 +31,7 @@ public class CommonConstants {
 
     public static final String PREFIX_OF_SERVER_INSTANCE = "Server_";
     public static final String PREFIX_OF_BROKER_INSTANCE = "Broker_";
+    public static final String PREFIX_OF_CONTROLLER_INSTANCE = "Controller_";
 
     public static final String SERVER_INSTANCE_TYPE = "server";
     public static final String BROKER_INSTANCE_TYPE = "broker";
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 1179996..1faa949 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -32,6 +32,7 @@ import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.controller.helix.core.util.HelixSetupUtils;
 import org.apache.pinot.filesystem.LocalPinotFS;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,6 +57,13 @@ public class ControllerConf extends PropertiesConfiguration {
   private static final String CONSOLE_WEBAPP_ROOT_PATH = "controller.query.console";
   private static final String CONSOLE_WEBAPP_USE_HTTPS = "controller.query.console.useHttps";
   private static final String EXTERNAL_VIEW_ONLINE_TO_OFFLINE_TIMEOUT = "controller.upload.onlineToOfflineTimeout";
+  private static final String CONTROLLER_MODE = "controller.mode";
+
+  public enum ControllerMode {
+    DUAL,
+    PINOT_ONLY,
+    HELIX_ONLY
+  }
 
   public static class ControllerPeriodicTasksConf {
     // frequency configs
@@ -143,6 +151,7 @@ public class ControllerConf extends PropertiesConfiguration {
   private static final int DEFAULT_REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = 64;
   private static final boolean DEFAULT_ENABLE_STORAGE_QUOTA_CHECK = true;
   private static final boolean DEFAULT_ENABLE_BATCH_MESSAGE_MODE = false;
+  private static final String DEFAULT_CONTROLLER_MODE = ControllerMode.DUAL.name();
 
   private static final String DEFAULT_PINOT_FS_FACTORY_CLASS_LOCAL = LocalPinotFS.class.getName();
 
@@ -596,4 +605,12 @@ public class ControllerConf extends PropertiesConfiguration {
   public long getPeriodicTaskInitialDelayInSeconds() {
     return ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds();
   }
+
+  public void setControllerMode(ControllerMode controllerMode) {
+    setProperty(CONTROLLER_MODE, controllerMode.name());
+  }
+
+  public ControllerMode getControllerMode() {
+    return ControllerMode.valueOf(getString(CONTROLLER_MODE, DEFAULT_CONTROLLER_MODE).toUpperCase());
+  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerLeadershipManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerLeadershipManager.java
index ed6b71f..6ffa655 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerLeadershipManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerLeadershipManager.java
@@ -34,6 +34,7 @@ public class ControllerLeadershipManager {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(ControllerLeadershipManager.class);
 
+  // TODO: fix the misuse of singleton.
   private static ControllerLeadershipManager INSTANCE = null;
 
   private HelixManager _helixManager;
@@ -75,6 +76,7 @@ public class ControllerLeadershipManager {
     if (_amILeader) {
       onBecomingNonLeader();
     }
+    INSTANCE = null;
   }
 
   /**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index 6416691..4051990 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -58,6 +58,8 @@ import org.apache.pinot.controller.helix.core.realtime.PinotRealtimeSegmentManag
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategyFactory;
 import org.apache.pinot.controller.helix.core.relocation.RealtimeSegmentRelocator;
 import org.apache.pinot.controller.helix.core.retention.RetentionManager;
+import org.apache.pinot.controller.helix.core.util.HelixSetupUtils;
+import org.apache.pinot.controller.helix.starter.HelixConfig;
 import org.apache.pinot.controller.validation.BrokerResourceValidationManager;
 import org.apache.pinot.controller.validation.OfflineSegmentIntervalChecker;
 import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
@@ -79,11 +81,21 @@ public class ControllerStarter {
 
   private final ControllerConf _config;
   private final ControllerAdminApiApplication _adminApp;
+  // TODO: rename this variable once it's full separated with Helix controller.
   private final PinotHelixResourceManager _helixResourceManager;
   private final MetricsRegistry _metricsRegistry;
   private final ControllerMetrics _controllerMetrics;
   private final ExecutorService _executorService;
 
+  private final String _helixZkURL;
+  private final String _helixClusterName;
+  private final String _instanceId;
+  private final boolean _isUpdateStateModel;
+  private final boolean _enableBatchMessageMode;
+  private final ControllerConf.ControllerMode _controllerMode;
+
+  private HelixManager _helixControllerManager;
+
   // Can only be constructed after resource manager getting started
   private OfflineSegmentIntervalChecker _offlineSegmentIntervalChecker;
   private RealtimeSegmentValidationManager _realtimeSegmentValidationManager;
@@ -95,23 +107,46 @@ public class ControllerStarter {
   private ControllerPeriodicTaskScheduler _controllerPeriodicTaskScheduler;
   private PinotHelixTaskResourceManager _helixTaskResourceManager;
   private PinotRealtimeSegmentManager _realtimeSegmentsManager;
+  private List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbackList;
 
   public ControllerStarter(ControllerConf conf) {
     _config = conf;
-    _adminApp =
-        new ControllerAdminApiApplication(_config.getQueryConsoleWebappPath(), _config.getQueryConsoleUseHttps());
-    // Do not use this before the invocation of {@link PinotHelixResourceManager::start()}, which happens in {@link ControllerStarter::start()}
-    _helixResourceManager = new PinotHelixResourceManager(_config);
+    _controllerMode = conf.getControllerMode();
+    // Helix related settings.
+    _helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(_config.getZkStr());
+    _helixClusterName = _config.getHelixClusterName();
+    _instanceId = conf.getControllerHost() + "_" + conf.getControllerPort();
+    _isUpdateStateModel = _config.isUpdateSegmentStateModel();
+    _enableBatchMessageMode = _config.getEnableBatchMessageMode();
+
     _metricsRegistry = new MetricsRegistry();
     _controllerMetrics = new ControllerMetrics(_metricsRegistry);
-    _executorService =
-        Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("restapi-multiget-thread-%d").build());
+    _serviceStatusCallbackList = new ArrayList<>();
+    if (_controllerMode == ControllerConf.ControllerMode.HELIX_ONLY) {
+      _adminApp = null;
+      _helixResourceManager = null;
+      _executorService = null;
+    } else {
+      _adminApp =
+          new ControllerAdminApiApplication(_config.getQueryConsoleWebappPath(), _config.getQueryConsoleUseHttps());
+      // Do not use this before the invocation of {@link PinotHelixResourceManager::start()}, which happens in {@link ControllerStarter::start()}
+      _helixResourceManager = new PinotHelixResourceManager(_config);
+      _executorService =
+          Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("restapi-multiget-thread-%d").build());
+    }
   }
 
   public PinotHelixResourceManager getHelixResourceManager() {
     return _helixResourceManager;
   }
 
+  /**
+   * Gets the Helix Manager connected as Helix controller.
+   */
+  public HelixManager getHelixControllerManager() {
+    return _helixControllerManager;
+  }
+
   public OfflineSegmentIntervalChecker getOfflineSegmentIntervalChecker() {
     return _offlineSegmentIntervalChecker;
   }
@@ -133,50 +168,75 @@ public class ControllerStarter {
   }
 
   public void start() {
-    LOGGER.info("Starting Pinot controller");
-
+    LOGGER.info("Starting Pinot controller in mode: {}.", _controllerMode.name());
     Utils.logVersions();
 
     // Set up controller metrics
     MetricsHelper.initializeMetrics(_config.subset(METRICS_REGISTRY_NAME));
     MetricsHelper.registerMetricsRegistry(_metricsRegistry);
 
-    Configuration pinotFSConfig = _config.subset(CommonConstants.Controller.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY);
-    Configuration segmentFetcherFactoryConfig =
-        _config.subset(CommonConstants.Controller.PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY);
-    Configuration pinotCrypterConfig = _config.subset(CommonConstants.Controller.PREFIX_OF_CONFIG_OF_PINOT_CRYPTER);
-
-    // Start all components
-    LOGGER.info("Initializing PinotFSFactory");
-    try {
-      PinotFSFactory.init(pinotFSConfig);
-    } catch (Exception e) {
-      Utils.rethrowException(e);
+    switch (_controllerMode) {
+      case DUAL:
+        setUpHelixController();
+        setUpPinotController();
+        break;
+      case PINOT_ONLY:
+        setUpPinotController();
+        break;
+      case HELIX_ONLY:
+        setUpHelixController();
+        break;
+      default:
+        LOGGER.error("Invalid mode: " + _controllerMode);
     }
 
-    LOGGER.info("Initializing SegmentFetcherFactory");
-    try {
-      SegmentFetcherFactory.getInstance().init(segmentFetcherFactoryConfig);
-    } catch (Exception e) {
-      throw new RuntimeException("Caught exception while initializing SegmentFetcherFactory", e);
-    }
+    ServiceStatus.setServiceStatusCallback(new ServiceStatus.MultipleCallbackServiceStatusCallback(_serviceStatusCallbackList));
+    _controllerMetrics.initializeGlobalMeters();
+  }
 
-    LOGGER.info("Initializing PinotCrypterFactory");
-    try {
-      PinotCrypterFactory.init(pinotCrypterConfig);
-    } catch (Exception e) {
-      throw new RuntimeException("Caught exception while initializing PinotCrypterFactory", e);
+  private void setUpHelixController() {
+    // Register and connect instance as Helix controller.
+    LOGGER.info("Starting Helix controller");
+    _helixControllerManager = HelixSetupUtils
+        .setup(_helixClusterName, _helixZkURL, _instanceId, _isUpdateStateModel, _enableBatchMessageMode);
+
+    // Emit helix controller metrics
+    _controllerMetrics.addCallbackGauge("helix.connected", () -> _helixControllerManager.isConnected() ? 1L : 0L);
+    _controllerMetrics.addCallbackGauge("helix.leader", () -> _helixControllerManager.isLeader() ? 1L : 0L);
+    _helixControllerManager.addPreConnectCallback(
+        () -> _controllerMetrics.addMeteredGlobalValue(ControllerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L));
+
+    _serviceStatusCallbackList.add(generateServiceStatusCallback(_helixControllerManager));
+  }
+
+  private void setUpPinotController() {
+    // Note: Right now we don't allow pinot-only mode to be used in production yet.
+    // Now we only have this mode used in tests.
+    // TODO: Remove this logic once all the helix separation PRs are committed.
+    if (_controllerMode == ControllerConf.ControllerMode.PINOT_ONLY && !isPinotOnlyModeSupported()) {
+      throw new RuntimeException("Pinot only controller currently isn't supported in production yet.");
     }
 
+    // Start all components
+    initPinotFSFactory();
+    initSegmentFetcherFactory();
+    initPinotCrypterFactory();
+
     LOGGER.info("Starting Pinot Helix resource manager and connecting to Zookeeper");
     _helixResourceManager.start();
-    final HelixManager helixManager = _helixResourceManager.getHelixZkManager();
+    HelixManager helixParticipantManager = _helixResourceManager.getHelixZkManager();
 
     LOGGER.info("Init controller leadership manager");
-    ControllerLeadershipManager.init(helixManager);
+    // Note: Currently leadership depends on helix controller, thus assign helixControllerManager to ControllerLeadershipManager.
+    // TODO: In the future when Helix separation is completed, leadership only depends on the master in leadControllerResource, and ControllerLeadershipManager will be removed.
+    if (_helixControllerManager != null) {
+      ControllerLeadershipManager.init(_helixControllerManager);
+    } else {
+      ControllerLeadershipManager.init(helixParticipantManager);
+    }
 
     LOGGER.info("Starting task resource manager");
-    _helixTaskResourceManager = new PinotHelixTaskResourceManager(new TaskDriver(helixManager));
+    _helixTaskResourceManager = new PinotHelixTaskResourceManager(new TaskDriver(helixParticipantManager));
 
     // Helix resource manager must be started in order to create PinotLLCRealtimeSegmentManager
     LOGGER.info("Starting realtime segment manager");
@@ -191,7 +251,7 @@ public class ControllerStarter {
     _controllerPeriodicTaskScheduler.init(controllerPeriodicTasks);
 
     LOGGER.info("Creating rebalance segments factory");
-    RebalanceSegmentStrategyFactory.createInstance(helixManager);
+    RebalanceSegmentStrategyFactory.createInstance(helixParticipantManager);
 
     String accessControlFactoryClass = _config.getAccessControlFactoryClass();
     LOGGER.info("Use class: {} as the AccessControlFactory", accessControlFactoryClass);
@@ -233,8 +293,6 @@ public class ControllerStarter {
     LOGGER.info("Controller services available at http://{}:{}/", _config.getControllerHost(),
         _config.getControllerPort());
 
-    _controllerMetrics.addCallbackGauge("helix.connected", () -> helixManager.isConnected() ? 1L : 0L);
-    _controllerMetrics.addCallbackGauge("helix.leader", () -> helixManager.isLeader() ? 1L : 0L);
     _controllerMetrics.addCallbackGauge("dataDir.exists", () -> new File(_config.getDataDir()).exists() ? 1L : 0L);
     _controllerMetrics.addCallbackGauge("dataDir.fileOpLatencyMs", () -> {
       File dataDir = new File(_config.getDataDir());
@@ -256,9 +314,13 @@ public class ControllerStarter {
       }
     });
 
-    ServiceStatus.setServiceStatusCallback(new ServiceStatus.ServiceStatusCallback() {
+    _serviceStatusCallbackList.add(generateServiceStatusCallback(helixParticipantManager));
+  }
+
+  private ServiceStatus.ServiceStatusCallback generateServiceStatusCallback(HelixManager helixManager) {
+    return new ServiceStatus.ServiceStatusCallback() {
       private boolean _isStarted = false;
-      private String _statusDescription = "Helix ZK Not connected";
+      private String _statusDescription = "Helix ZK Not connected as " + helixManager.getInstanceType();
 
       @Override
       public ServiceStatus.Status getServiceStatus() {
@@ -285,11 +347,42 @@ public class ControllerStarter {
       public String getStatusDescription() {
         return _statusDescription;
       }
-    });
+    };
+  }
 
-    helixManager.addPreConnectCallback(
-        () -> _controllerMetrics.addMeteredGlobalValue(ControllerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L));
-    _controllerMetrics.initializeGlobalMeters();
+  private void initPinotFSFactory() {
+    Configuration pinotFSConfig = _config.subset(CommonConstants.Controller.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY);
+    LOGGER.info("Initializing PinotFSFactory");
+    try {
+      PinotFSFactory.init(pinotFSConfig);
+    } catch (Exception e) {
+      Utils.rethrowException(e);
+    }
+  }
+
+  private void initSegmentFetcherFactory() {
+    Configuration segmentFetcherFactoryConfig =
+        _config.subset(CommonConstants.Controller.PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY);
+    LOGGER.info("Initializing SegmentFetcherFactory");
+    try {
+      SegmentFetcherFactory.getInstance().init(segmentFetcherFactoryConfig);
+    } catch (Exception e) {
+      throw new RuntimeException("Caught exception while initializing SegmentFetcherFactory", e);
+    }
+  }
+
+  private void initPinotCrypterFactory() {
+    Configuration pinotCrypterConfig = _config.subset(CommonConstants.Controller.PREFIX_OF_CONFIG_OF_PINOT_CRYPTER);
+    LOGGER.info("Initializing PinotCrypterFactory");
+    try {
+      PinotCrypterFactory.init(pinotCrypterConfig);
+    } catch (Exception e) {
+      throw new RuntimeException("Caught exception while initializing PinotCrypterFactory", e);
+    }
+  }
+
+  public ControllerConf.ControllerMode getControllerMode() {
+    return _controllerMode;
   }
 
   @VisibleForTesting
@@ -319,7 +412,28 @@ public class ControllerStarter {
   }
 
   public void stop() {
+      switch (_controllerMode) {
+        case DUAL:
+          stopPinotController();
+          stopHelixController();
+          break;
+        case PINOT_ONLY:
+          stopPinotController();
+          break;
+        case HELIX_ONLY:
+          stopHelixController();
+          break;
+      }
+  }
+
+  private void stopHelixController() {
+    LOGGER.info("Disconnecting helix zk manager");
+    _helixControllerManager.disconnect();
+  }
+
+  private void stopPinotController() {
     try {
+      // Stopping ControllerLeadershipManager has to be done before stopping HelixResourceManager.
       LOGGER.info("Stopping controller leadership manager");
       ControllerLeadershipManager.getInstance().stop();
 
@@ -339,12 +453,19 @@ public class ControllerStarter {
       LOGGER.info("Stopping resource manager");
       _helixResourceManager.stop();
 
+      LOGGER.info("Stopping rebalance segments factory");
+      RebalanceSegmentStrategyFactory.stop();
+
       _executorService.shutdownNow();
     } catch (final Exception e) {
       LOGGER.error("Caught exception while shutting down", e);
     }
   }
 
+  public boolean isPinotOnlyModeSupported() {
+    return false;
+  }
+
   public MetricsRegistry getMetricsRegistry() {
     return _metricsRegistry;
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 5e15bff..04b6f9b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -46,6 +46,7 @@ import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
@@ -103,7 +104,6 @@ import org.apache.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategy
 import org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategy;
 import org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategyEnum;
 import org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategyFactory;
-import org.apache.pinot.controller.helix.core.util.HelixSetupUtils;
 import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
 import org.apache.pinot.controller.helix.starter.HelixConfig;
 import org.apache.pinot.core.realtime.stream.StreamConfig;
@@ -128,7 +128,6 @@ public class PinotHelixResourceManager {
   private final String _dataDir;
   private final long _externalViewOnlineToOfflineTimeoutMillis;
   private final boolean _isSingleTenantCluster;
-  private final boolean _isUpdateStateModel;
   private final boolean _enableBatchMessageMode;
 
   private HelixManager _helixZkManager;
@@ -142,37 +141,34 @@ public class PinotHelixResourceManager {
 
   public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String helixClusterName,
       @Nonnull String controllerInstanceId, String dataDir, long externalViewOnlineToOfflineTimeoutMillis,
-      boolean isSingleTenantCluster, boolean isUpdateStateModel, boolean enableBatchMessageMode) {
+      boolean isSingleTenantCluster, boolean enableBatchMessageMode) {
     _helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL);
     _helixClusterName = helixClusterName;
     _instanceId = controllerInstanceId;
     _dataDir = dataDir;
     _externalViewOnlineToOfflineTimeoutMillis = externalViewOnlineToOfflineTimeoutMillis;
     _isSingleTenantCluster = isSingleTenantCluster;
-    _isUpdateStateModel = isUpdateStateModel;
     _enableBatchMessageMode = enableBatchMessageMode;
   }
 
   public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String helixClusterName,
       @Nonnull String controllerInstanceId, @Nonnull String dataDir) {
     this(zkURL, helixClusterName, controllerInstanceId, dataDir, DEFAULT_EXTERNAL_VIEW_UPDATE_TIMEOUT_MILLIS, false,
-        false, false);
+        true);
   }
 
   public PinotHelixResourceManager(@Nonnull ControllerConf controllerConf) {
     this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(),
         controllerConf.getControllerHost() + "_" + controllerConf.getControllerPort(), controllerConf.getDataDir(),
         controllerConf.getExternalViewOnlineToOfflineTimeout(), controllerConf.tenantIsolationEnabled(),
-        controllerConf.isUpdateSegmentStateModel(), controllerConf.getEnableBatchMessageMode());
+        controllerConf.getEnableBatchMessageMode());
   }
 
   /**
    * Create Helix cluster if needed, and then start a Pinot controller instance.
    */
   public synchronized void start() {
-    _helixZkManager = HelixSetupUtils
-        .setup(_helixClusterName, _helixZkURL, _instanceId, _isUpdateStateModel, _enableBatchMessageMode);
-    Preconditions.checkNotNull(_helixZkManager);
+    _helixZkManager = registerAndConnectAsHelixParticipant();
     _helixAdmin = _helixZkManager.getClusterManagmentTool();
     _propertyStore = _helixZkManager.getHelixPropertyStore();
     _helixDataAccessor = _helixZkManager.getHelixDataAccessor();
@@ -254,6 +250,24 @@ public class PinotHelixResourceManager {
   }
 
   /**
+   * Register and connect to Helix cluster as PARTICIPANT role.
+   */
+  private HelixManager registerAndConnectAsHelixParticipant() {
+    HelixManager helixManager = HelixManagerFactory
+        .getZKHelixManager(_helixClusterName, CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + _instanceId,
+            InstanceType.PARTICIPANT, _helixZkURL);
+    try {
+      helixManager.connect();
+      return helixManager;
+    } catch (Exception e) {
+      String errorMsg =
+          String.format("Exception when connecting the instance %s as Participant to Helix.", _instanceId);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg);
+    }
+  }
+
+  /**
    * Instance related APIs
    */
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 7a883ca..c26780b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -111,6 +111,7 @@ public class PinotLLCRealtimeSegmentManager {
    */
   private static int MAX_SEGMENT_COMPLETION_TIME_MILLIS = 300_000; // 5 MINUTES
 
+  // TODO: fix the misuse of singleton.
   private static PinotLLCRealtimeSegmentManager INSTANCE = null;
 
   private final HelixAdmin _helixAdmin;
@@ -179,6 +180,8 @@ public class PinotLLCRealtimeSegmentManager {
       }
     }
     LOGGER.info("Wait completed: Number of completing segments = {}", _numCompletingSegments.get());
+    INSTANCE = null;
+    SegmentCompletionManager.stop();
   }
 
   protected PinotLLCRealtimeSegmentManager(HelixAdmin helixAdmin, String clusterName, HelixManager helixManager,
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index b499e3e..7591898 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -64,6 +64,7 @@ public class SegmentCompletionManager {
     ABORTED,      // state machine is aborted. we will start a fresh one when the next segmentConsumed comes in.
   }
 
+  // TODO: fix the misuse of singleton.
   private static SegmentCompletionManager _instance = null;
 
   private final HelixManager _helixManager;
@@ -1115,6 +1116,10 @@ public class SegmentCompletionManager {
     }
   }
 
+  public static void stop() {
+    _instance = null;
+  }
+
   @VisibleForTesting
   protected boolean isLeader() {
     return ControllerLeadershipManager.getInstance().isLeader();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSegmentStrategyFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSegmentStrategyFactory.java
index 8e6ca40..30ac8af 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSegmentStrategyFactory.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSegmentStrategyFactory.java
@@ -28,6 +28,7 @@ import org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategy
  */
 public class RebalanceSegmentStrategyFactory {
 
+  // TODO: fix the misuse of singleton.
   private static RebalanceSegmentStrategyFactory INSTANCE = null;
 
   private HelixManager _helixManager;
@@ -63,4 +64,8 @@ public class RebalanceSegmentStrategyFactory {
         return new DefaultRebalanceSegmentStrategy(_helixManager);
     }
   }
+
+  public static void stop() {
+    INSTANCE = null;
+  }
 }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResourceTest.java
index 0329199..28049ec 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResourceTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResourceTest.java
@@ -46,9 +46,9 @@ public class PinotInstanceRestletResourceTest extends ControllerTest {
   @Test
   public void testInstanceListingAndCreation()
       throws Exception {
-    // Check that there are no instances
+    // Check that there is only one instance, which is the controller instance.
     JsonNode instanceList = JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceList()));
-    assertEquals(instanceList.get("instances").size(), 0, "Expected empty instance list at beginning of test");
+    assertEquals(instanceList.get("instances").size(), 1, "Expected only one instance at beginning of test");
 
     // Create untagged broker and server instances
     ObjectNode brokerInstance =
@@ -59,17 +59,17 @@ public class PinotInstanceRestletResourceTest extends ControllerTest {
         (ObjectNode) JsonUtils.stringToJsonNode("{\"host\":\"1.2.3.4\", \"type\":\"server\", \"port\":\"2345\"}");
     sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(), serverInstance.toString());
 
-    // Check that there are two instances
+    // Check that there are three instances
     TestUtils.waitForCondition(aVoid -> {
       try {
         // Check that there are two instances
         return
             JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceList())).get("instances")
-                .size() == 2;
+                .size() == 3;
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
-    }, 500L, 10_000L, "Expected two instances after creation of tagged instances");
+    }, 500L, 10_000L, "Expected three instances after creation of tagged instances");
 
     // Create tagged broker and server instances
     brokerInstance.put("tag", "someTag");
@@ -83,14 +83,14 @@ public class PinotInstanceRestletResourceTest extends ControllerTest {
     // It may take some time for cache data accessor to update its data.
     TestUtils.waitForCondition(aVoid -> {
       try {
-        // Check that there are four instances
+        // Check that there are five instances
         return
             JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceList())).get("instances")
-                .size() == 4;
+                .size() == 5;
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
-    }, 500L, 10_000L, "Expected four instances after creation of tagged instances");
+    }, 500L, 10_000L, "Expected five instances after creation of tagged instances");
 
     // Create duplicate broker and server instances (both calls should fail)
     try {
@@ -107,11 +107,11 @@ public class PinotInstanceRestletResourceTest extends ControllerTest {
       // Expected
     }
 
-    // Check that there are four instances
+    // Check that there are five instances
     JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceList()));
     assertEquals(
         JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceList())).get("instances")
-            .size(), 4, "Expected fore instances after creation of duplicate instances");
+            .size(), 5, "Expected five instances after creation of duplicate instances");
 
     // Check that the instances are properly created
     JsonNode instance = JsonUtils
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/TableViewsTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/TableViewsTest.java
index 18d1387..4ac9418 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/TableViewsTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/TableViewsTest.java
@@ -22,6 +22,7 @@ import java.net.HttpURLConnection;
 import java.net.URL;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.helix.InstanceType;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.utils.CommonConstants;
@@ -65,6 +66,7 @@ public class TableViewsTest extends ControllerTest {
     TableConfig tableConfig =
         new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME)
             .setNumReplicas(2).build();
+    Assert.assertEquals(_helixManager.getInstanceType(), InstanceType.CONTROLLER);
     _helixResourceManager.addTable(tableConfig);
     _helixResourceManager
         .addNewSegment(SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, OFFLINE_SEGMENT_NAME),
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterTest.java
index d37cc65..8f72a59 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterTest.java
@@ -55,6 +55,7 @@ public class ControllerPeriodicTaskStarterTest extends ControllerTest {
     _mockControllerStarter = new MockControllerStarter(config);
     _mockControllerStarter.start();
     _helixResourceManager = _mockControllerStarter.getHelixResourceManager();
+    _helixManager = _mockControllerStarter.getHelixControllerManager();
   }
 
   @Override
@@ -65,7 +66,12 @@ public class ControllerPeriodicTaskStarterTest extends ControllerTest {
     _mockControllerStarter = null;
   }
 
-  private class MockControllerStarter extends ControllerStarter {
+  @Override
+  protected ControllerStarter getControllerStarter() {
+    return _mockControllerStarter;
+  }
+
+  private class MockControllerStarter extends TestOnlyControllerStarter {
 
     private static final int NUM_PERIODIC_TASKS = 7;
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index b3d6d31..20bdcfb 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -105,11 +105,27 @@ public abstract class ControllerTest {
     return config;
   }
 
+  public class TestOnlyControllerStarter extends ControllerStarter {
+
+    TestOnlyControllerStarter(ControllerConf conf) {
+      super(conf);
+    }
+
+    @Override
+    public boolean isPinotOnlyModeSupported() {
+      return true;
+    }
+  }
+
   protected void startController() {
     startController(getDefaultControllerConfiguration());
   }
 
   protected void startController(ControllerConf config) {
+    startController(config, true);
+  }
+
+  protected void startController(ControllerConf config, boolean deleteCluster) {
     Assert.assertNotNull(config);
     Assert.assertNull(_controllerStarter);
 
@@ -123,21 +139,31 @@ public abstract class ControllerTest {
 
     String zkStr = config.getZkStr();
     _zkClient = new ZkClient(zkStr);
-    if (_zkClient.exists("/" + helixClusterName)) {
+    if (_zkClient.exists("/" + helixClusterName) && deleteCluster) {
       _zkClient.deleteRecursive("/" + helixClusterName);
     }
 
     startControllerStarter(config);
 
-    _helixManager = _helixResourceManager.getHelixZkManager();
-    _helixAdmin = _helixResourceManager.getHelixAdmin();
-    _propertyStore = _helixResourceManager.getPropertyStore();
+    // HelixResourceManager is null in Helix only mode, while HelixManager is null in Pinot only mode.
+    switch (getControllerStarter().getControllerMode()) {
+      case DUAL:
+      case PINOT_ONLY:
+        _helixAdmin = _helixResourceManager.getHelixAdmin();
+        _propertyStore = _helixResourceManager.getPropertyStore();
+        break;
+      case HELIX_ONLY:
+        _helixAdmin = _helixManager.getClusterManagmentTool();
+        _propertyStore = _helixManager.getHelixPropertyStore();
+        break;
+    }
   }
 
   protected void startControllerStarter(ControllerConf config) {
-    _controllerStarter = new ControllerStarter(config);
+    _controllerStarter = new TestOnlyControllerStarter(config);
     _controllerStarter.start();
     _helixResourceManager = _controllerStarter.getHelixResourceManager();
+    _helixManager = _controllerStarter.getHelixControllerManager();
   }
 
   protected void stopController() {
@@ -153,6 +179,10 @@ public abstract class ControllerTest {
     _controllerStarter = null;
   }
 
+  protected ControllerStarter getControllerStarter() {
+    return _controllerStarter;
+  }
+
   protected Schema createDummySchema(String tableName) {
     Schema schema = new Schema();
     schema.setSchemaName(tableName);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
new file mode 100644
index 0000000..9bf70bd
--- /dev/null
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix;
+
+import org.apache.helix.HelixManager;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.ControllerStarter;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class PinotControllerModeTest extends ControllerTest {
+  private static long TIMEOUT_IN_MS = 10_000L;
+  private ControllerConf config;
+  private int controllerPortOffset;
+
+  @BeforeClass
+  public void setUp() {
+    startZk();
+    config = getDefaultControllerConfiguration();
+    controllerPortOffset = 0;
+  }
+
+  @Test
+  public void testHelixOnlyController()
+      throws Exception {
+    config.setControllerMode(ControllerConf.ControllerMode.HELIX_ONLY);
+    config.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++));
+
+    startController(config);
+    TestUtils.waitForCondition(aVoid -> _helixManager.isConnected(), TIMEOUT_IN_MS,
+        "Failed to start " + config.getControllerMode() + " controller in " + TIMEOUT_IN_MS + "ms.");
+
+    Assert.assertEquals(_controllerStarter.getControllerMode(), ControllerConf.ControllerMode.HELIX_ONLY);
+
+    stopController();
+    _controllerStarter = null;
+  }
+
+  @Test
+  public void testDualModeController()
+      throws Exception {
+    config.setControllerMode(ControllerConf.ControllerMode.DUAL);
+    config.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++));
+
+    startController(config);
+    TestUtils.waitForCondition(aVoid -> _helixManager.isConnected(), TIMEOUT_IN_MS,
+        "Failed to start " + config.getControllerMode() + " controller in " + TIMEOUT_IN_MS + "ms.");
+    Assert.assertEquals(_controllerStarter.getControllerMode(), ControllerConf.ControllerMode.DUAL);
+
+    stopController();
+    _controllerStarter = null;
+  }
+
+  @Test
+  public void testPinotOnlyController()
+      throws Exception {
+
+    config.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY);
+    config.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++));
+
+    // Starting pinot only controller before starting helix controller should fail.
+    try {
+      startController(config);
+      Assert.fail("Starting pinot only controller should fail!");
+    } catch (RuntimeException e) {
+      _controllerStarter = null;
+    }
+
+    // Starting a helix controller.
+    ControllerConf config2 = getDefaultControllerConfiguration();
+    config2.setHelixClusterName(getHelixClusterName());
+    config2.setControllerMode(ControllerConf.ControllerMode.HELIX_ONLY);
+    config2.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++));
+    ControllerStarter helixControllerStarter = new ControllerStarter(config2);
+    helixControllerStarter.start();
+    HelixManager helixControllerManager = helixControllerStarter.getHelixControllerManager();
+    TestUtils.waitForCondition(aVoid -> helixControllerManager.isConnected(), TIMEOUT_IN_MS,
+        "Failed to start " + config2.getControllerMode() + " controller in " + TIMEOUT_IN_MS + "ms.");
+
+    // Starting a pinot only controller.
+    startController(config, false);
+    TestUtils.waitForCondition(aVoid -> _helixResourceManager.getHelixZkManager().isConnected(), TIMEOUT_IN_MS,
+        "Failed to start " + config.getControllerMode() + " controller in " + TIMEOUT_IN_MS + "ms.");
+    Assert.assertEquals(_controllerStarter.getControllerMode(), ControllerConf.ControllerMode.PINOT_ONLY);
+
+    stopController();
+    _controllerStarter = null;
+    helixControllerStarter.stop();
+  }
+
+  @AfterClass
+  public void tearDown() {
+    stopZk();
+  }
+}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
index 550fe52..e6a29c6 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
@@ -22,7 +22,6 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import org.apache.helix.HelixAdmin;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.config.TableConfig;
@@ -30,7 +29,6 @@ import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -38,14 +36,11 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 
-public class PinotResourceManagerTest {
-  private final static String HELIX_CLUSTER_NAME = "testCluster";
+public class PinotResourceManagerTest extends ControllerTest {
   private final static String TABLE_NAME = "testTable";
 
   private ZkStarter.ZookeeperInstance _zookeeperInstance;
   private ZkClient _zkClient;
-  private PinotHelixResourceManager _pinotHelixResourceManager;
-  private HelixAdmin _helixAdmin;
 
   @BeforeClass
   public void setUp()
@@ -53,27 +48,22 @@ public class PinotResourceManagerTest {
     _zookeeperInstance = ZkStarter.startLocalZkServer();
     _zkClient = new ZkClient(ZkStarter.DEFAULT_ZK_STR);
 
-    final String instanceId = "localhost_helixController";
-    _pinotHelixResourceManager =
-        new PinotHelixResourceManager(ZkStarter.DEFAULT_ZK_STR, HELIX_CLUSTER_NAME, instanceId, null, 10000L, true,
-            /*isUpdateStateModel=*/ false, false);
-    _pinotHelixResourceManager.start();
-    _helixAdmin = _pinotHelixResourceManager.getHelixAdmin();
+    startController();
 
     ControllerRequestBuilderUtil
-        .addFakeDataInstancesToAutoJoinHelixCluster(HELIX_CLUSTER_NAME, ZkStarter.DEFAULT_ZK_STR, 1, true);
+        .addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, 1, true);
     ControllerRequestBuilderUtil
-        .addFakeBrokerInstancesToAutoJoinHelixCluster(HELIX_CLUSTER_NAME, ZkStarter.DEFAULT_ZK_STR, 1, true);
-    Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_BROKER").size(), 1);
+        .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, 1, true);
+    Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size(), 1);
     Assert
-        .assertEquals(_helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_OFFLINE").size(), 1);
+        .assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_OFFLINE").size(), 1);
     Assert
-        .assertEquals(_helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_REALTIME").size(), 1);
+        .assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_REALTIME").size(), 1);
 
     // Adding table
     TableConfig tableConfig =
         new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(TABLE_NAME).build();
-    _pinotHelixResourceManager.addTable(tableConfig);
+    _helixResourceManager.addTable(tableConfig);
   }
 
   @Test
@@ -83,18 +73,18 @@ public class PinotResourceManagerTest {
     segmentZKMetadata.setSegmentName("testSegment");
 
     // Segment ZK metadata does not exist
-    Assert.assertFalse(_pinotHelixResourceManager.updateZkMetadata(segmentZKMetadata, 0));
+    Assert.assertFalse(_helixResourceManager.updateZkMetadata(segmentZKMetadata, 0));
 
     // Set segment ZK metadata
-    Assert.assertTrue(_pinotHelixResourceManager.updateZkMetadata(segmentZKMetadata));
+    Assert.assertTrue(_helixResourceManager.updateZkMetadata(segmentZKMetadata));
 
     // Update ZK metadata
     Assert.assertEquals(
-        _pinotHelixResourceManager.getSegmentMetadataZnRecord("testTable_OFFLINE", "testSegment").getVersion(), 0);
-    Assert.assertTrue(_pinotHelixResourceManager.updateZkMetadata(segmentZKMetadata, 0));
+        _helixResourceManager.getSegmentMetadataZnRecord("testTable_OFFLINE", "testSegment").getVersion(), 0);
+    Assert.assertTrue(_helixResourceManager.updateZkMetadata(segmentZKMetadata, 0));
     Assert.assertEquals(
-        _pinotHelixResourceManager.getSegmentMetadataZnRecord("testTable_OFFLINE", "testSegment").getVersion(), 1);
-    Assert.assertFalse(_pinotHelixResourceManager.updateZkMetadata(segmentZKMetadata, 0));
+        _helixResourceManager.getSegmentMetadataZnRecord("testTable_OFFLINE", "testSegment").getVersion(), 1);
+    Assert.assertFalse(_helixResourceManager.updateZkMetadata(segmentZKMetadata, 0));
   }
 
   /**
@@ -112,16 +102,16 @@ public class PinotResourceManagerTest {
 
     // Basic add/delete case
     for (int i = 1; i <= 2; i++) {
-      _pinotHelixResourceManager.addNewSegment(SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME), "downloadUrl");
+      _helixResourceManager.addNewSegment(SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME), "downloadUrl");
     }
-    IdealState idealState = _helixAdmin.getResourceIdealState(HELIX_CLUSTER_NAME, offlineTableName);
+    IdealState idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), offlineTableName);
     Set<String> segments = idealState.getPartitionSet();
     Assert.assertEquals(segments.size(), 2);
 
     for (String segmentName : segments) {
-      _pinotHelixResourceManager.deleteSegment(offlineTableName, segmentName);
+      _helixResourceManager.deleteSegment(offlineTableName, segmentName);
     }
-    idealState = _helixAdmin.getResourceIdealState(HELIX_CLUSTER_NAME, offlineTableName);
+    idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), offlineTableName);
     Assert.assertEquals(idealState.getPartitionSet().size(), 0);
 
     // Concurrent segment deletion
@@ -131,7 +121,7 @@ public class PinotResourceManagerTest {
         @Override
         public void run() {
           for (int i = 0; i < 10; i++) {
-            _pinotHelixResourceManager
+            _helixResourceManager
                 .addNewSegment(SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME), "downloadUrl");
           }
         }
@@ -140,7 +130,7 @@ public class PinotResourceManagerTest {
     addSegmentExecutor.shutdown();
     addSegmentExecutor.awaitTermination(1, TimeUnit.MINUTES);
 
-    idealState = _helixAdmin.getResourceIdealState(HELIX_CLUSTER_NAME, offlineTableName);
+    idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), offlineTableName);
     Assert.assertEquals(idealState.getPartitionSet().size(), 30);
 
     ExecutorService deleteSegmentExecutor = Executors.newFixedThreadPool(3);
@@ -148,20 +138,20 @@ public class PinotResourceManagerTest {
       deleteSegmentExecutor.execute(new Runnable() {
         @Override
         public void run() {
-          _pinotHelixResourceManager.deleteSegment(offlineTableName, segmentName);
+          _helixResourceManager.deleteSegment(offlineTableName, segmentName);
         }
       });
     }
     deleteSegmentExecutor.shutdown();
     deleteSegmentExecutor.awaitTermination(1, TimeUnit.MINUTES);
 
-    idealState = _helixAdmin.getResourceIdealState(HELIX_CLUSTER_NAME, offlineTableName);
+    idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), offlineTableName);
     Assert.assertEquals(idealState.getPartitionSet().size(), 0);
   }
 
   @AfterClass
   public void tearDown() {
-    _pinotHelixResourceManager.stop();
+    _helixResourceManager.stop();
     _zkClient.close();
     ZkStarter.stopLocalZkServer(_zookeeperInstance);
   }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java
index 3879806..87fa60f 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java
@@ -23,7 +23,6 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import org.I0Itec.zkclient.ZkClient;
-import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixManager;
 import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.config.ColumnPartitionConfig;
@@ -37,9 +36,7 @@ import org.apache.pinot.common.partition.ReplicaGroupPartitionAssignmentGenerato
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
-import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.helix.core.util.HelixSetupUtils;
-import org.apache.pinot.controller.helix.starter.HelixConfig;
+import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.controller.utils.ReplicaGroupTestUtils;
 import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
 import org.testng.Assert;
@@ -48,9 +45,8 @@ import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
 
-public class SegmentAssignmentStrategyTest {
+public class SegmentAssignmentStrategyTest extends ControllerTest {
   private final static String ZK_SERVER = ZkStarter.DEFAULT_ZK_STR;
-  private final static String HELIX_CLUSTER_NAME = "TestSegmentAssignmentStrategyHelix";
   private final static String TABLE_NAME_BALANCED = "testResourceBalanced";
   private final static String TABLE_NAME_RANDOM = "testResourceRandom";
   private final static String TABLE_NAME_REPLICA_GROUP_PARTITION_ASSIGNMENT = "testReplicaGroupPartitionAssignment";
@@ -60,10 +56,7 @@ public class SegmentAssignmentStrategyTest {
   private static final Random random = new Random();
   private final static String PARTITION_COLUMN = "memberId";
   private final static int NUM_REPLICA = 2;
-  private PinotHelixResourceManager _pinotHelixResourceManager;
   private ZkClient _zkClient;
-  private HelixManager _helixZkManager;
-  private HelixAdmin _helixAdmin;
   private final int _numServerInstance = 10;
   private final int _numBrokerInstance = 1;
   private ZkStarter.ZookeeperInstance _zookeeperInstance;
@@ -74,40 +67,33 @@ public class SegmentAssignmentStrategyTest {
       throws Exception {
     _zookeeperInstance = ZkStarter.startLocalZkServer();
     _zkClient = new ZkClient(ZK_SERVER);
-    final String zkPath = "/" + HELIX_CLUSTER_NAME;
+    final String zkPath = "/" + getHelixClusterName();
     if (_zkClient.exists(zkPath)) {
       _zkClient.deleteRecursive(zkPath);
     }
-    final String instanceId = "localhost_helixController";
-    _pinotHelixResourceManager = new PinotHelixResourceManager(ZK_SERVER, HELIX_CLUSTER_NAME, instanceId, null, 10000L,
-        true, /*isUpdateStateModel=*/
-        false, false);
-    _pinotHelixResourceManager.start();
-
-    final String helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(ZK_SERVER);
-    _helixZkManager =
-        HelixSetupUtils.setup(HELIX_CLUSTER_NAME, helixZkURL, instanceId, /*isUpdateStateModel=*/false, true);
-    _helixAdmin = _helixZkManager.getClusterManagmentTool();
+    startController();
+    HelixManager helixZkManager = _helixResourceManager.getHelixZkManager();
+
     _partitionAssignmentGenerator =
-        new ReplicaGroupPartitionAssignmentGenerator(_helixZkManager.getHelixPropertyStore());
+        new ReplicaGroupPartitionAssignmentGenerator(helixZkManager.getHelixPropertyStore());
 
     ControllerRequestBuilderUtil
-        .addFakeDataInstancesToAutoJoinHelixCluster(HELIX_CLUSTER_NAME, ZK_SERVER, _numServerInstance, true);
+        .addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(), ZK_SERVER, _numServerInstance, true);
     ControllerRequestBuilderUtil
-        .addFakeBrokerInstancesToAutoJoinHelixCluster(HELIX_CLUSTER_NAME, ZK_SERVER, _numBrokerInstance, true);
+        .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(), ZK_SERVER, _numBrokerInstance, true);
     Thread.sleep(100);
-    Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_OFFLINE").size(),
+    Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_OFFLINE").size(),
         _numServerInstance);
-    Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_REALTIME").size(),
+    Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_REALTIME").size(),
         _numServerInstance);
 
-    Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_BROKER").size(),
+    Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size(),
         _numBrokerInstance);
   }
 
   @AfterTest
   public void tearDown() {
-    _pinotHelixResourceManager.stop();
+    _helixResourceManager.stop();
     _zkClient.close();
     ZkStarter.stopLocalZkServer(_zookeeperInstance);
   }
@@ -119,15 +105,15 @@ public class SegmentAssignmentStrategyTest {
     TableConfig tableConfig =
         new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(TABLE_NAME_RANDOM)
             .setSegmentAssignmentStrategy("RandomAssignmentStrategy").setNumReplicas(NUM_REPLICA).build();
-    _pinotHelixResourceManager.addTable(tableConfig);
+    _helixResourceManager.addTable(tableConfig);
 
     // Wait for the table addition
-    while (!_pinotHelixResourceManager.hasOfflineTable(TABLE_NAME_RANDOM)) {
+    while (!_helixResourceManager.hasOfflineTable(TABLE_NAME_RANDOM)) {
       Thread.sleep(100);
     }
 
     for (int i = 0; i < 10; ++i) {
-      _pinotHelixResourceManager
+      _helixResourceManager
           .addNewSegment(SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME_RANDOM), "downloadUrl");
 
       // Wait for all segments appear in the external view
@@ -135,13 +121,13 @@ public class SegmentAssignmentStrategyTest {
         Thread.sleep(100);
       }
       final Set<String> taggedInstances =
-          _pinotHelixResourceManager.getAllInstancesForServerTenant("DefaultTenant_OFFLINE");
+          _helixResourceManager.getAllInstancesForServerTenant("DefaultTenant_OFFLINE");
       final Map<String, Integer> instanceToNumSegmentsMap = new HashMap<>();
       for (final String instance : taggedInstances) {
         instanceToNumSegmentsMap.put(instance, 0);
       }
       IdealState idealState = _helixAdmin
-          .getResourceIdealState(HELIX_CLUSTER_NAME, TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME_RANDOM));
+          .getResourceIdealState(getHelixClusterName(), TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME_RANDOM));
       Assert.assertEquals(idealState.getPartitionSet().size(), i + 1);
       for (final String segmentId : idealState.getPartitionSet()) {
         Assert.assertEquals(idealState.getInstanceStateMap(segmentId).size(), NUM_REPLICA);
@@ -158,11 +144,11 @@ public class SegmentAssignmentStrategyTest {
     TableConfig tableConfig =
         new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(TABLE_NAME_BALANCED)
             .setSegmentAssignmentStrategy("BalanceNumSegmentAssignmentStrategy").setNumReplicas(numReplicas).build();
-    _pinotHelixResourceManager.addTable(tableConfig);
+    _helixResourceManager.addTable(tableConfig);
 
     int numSegments = 20;
     for (int i = 0; i < numSegments; ++i) {
-      _pinotHelixResourceManager
+      _helixResourceManager
           .addNewSegment(SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME_BALANCED), "downloadUrl");
     }
 
@@ -172,13 +158,13 @@ public class SegmentAssignmentStrategyTest {
     }
 
     final Set<String> taggedInstances =
-        _pinotHelixResourceManager.getAllInstancesForServerTenant("DefaultTenant_OFFLINE");
+        _helixResourceManager.getAllInstancesForServerTenant("DefaultTenant_OFFLINE");
     final Map<String, Integer> instance2NumSegmentsMap = new HashMap<>();
     for (final String instance : taggedInstances) {
       instance2NumSegmentsMap.put(instance, 0);
     }
     final IdealState idealState = _helixAdmin
-        .getResourceIdealState(HELIX_CLUSTER_NAME, TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME_BALANCED));
+        .getResourceIdealState(getHelixClusterName(), TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME_BALANCED));
     for (final String segmentId : idealState.getPartitionSet()) {
       for (final String instance : idealState.getInstanceStateMap(segmentId).keySet()) {
         instance2NumSegmentsMap.put(instance, instance2NumSegmentsMap.get(instance) + 1);
@@ -196,7 +182,7 @@ public class SegmentAssignmentStrategyTest {
       Assert.assertTrue(instance2NumSegmentsMap.get(instance) <= maxNumSegmentsPerInstance,
           "expected <=" + maxNumSegmentsPerInstance + " actual:" + instance2NumSegmentsMap.get(instance));
     }
-    _helixAdmin.dropResource(HELIX_CLUSTER_NAME, TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME_BALANCED));
+    _helixAdmin.dropResource(getHelixClusterName(), TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME_BALANCED));
   }
 
   @Test
@@ -209,7 +195,7 @@ public class SegmentAssignmentStrategyTest {
     TableConfig tableConfig = new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE)
         .setTableName(TABLE_NAME_REPLICA_GROUP_PARTITION_ASSIGNMENT)
         .setSegmentAssignmentStrategy("RandomAssignmentStrategy").setNumReplicas(NUM_REPLICA).build();
-    _pinotHelixResourceManager.addTable(tableConfig);
+    _helixResourceManager.addTable(tableConfig);
 
     // Check that partition assignment does not exist
     ReplicaGroupPartitionAssignment partitionAssignment =
@@ -230,23 +216,23 @@ public class SegmentAssignmentStrategyTest {
     replicaGroupTableConfig.getValidationConfig().setReplicaGroupStrategyConfig(replicaGroupStrategyConfig);
 
     // Check that the replica group partition assignment is created
-    _pinotHelixResourceManager
+    _helixResourceManager
         .setExistingTableConfig(replicaGroupTableConfig, tableNameWithType, CommonConstants.Helix.TableType.OFFLINE);
     partitionAssignment = _partitionAssignmentGenerator.getReplicaGroupPartitionAssignment(tableNameWithType);
     Assert.assertTrue(partitionAssignment != null);
 
     // After table deletion, check that the replica group partition assignment is deleted
-    _pinotHelixResourceManager.deleteOfflineTable(rawTableName);
+    _helixResourceManager.deleteOfflineTable(rawTableName);
     partitionAssignment = _partitionAssignmentGenerator.getReplicaGroupPartitionAssignment(tableNameWithType);
     Assert.assertTrue(partitionAssignment == null);
 
     // Create a table with replica group
-    _pinotHelixResourceManager.addTable(replicaGroupTableConfig);
+    _helixResourceManager.addTable(replicaGroupTableConfig);
     partitionAssignment = _partitionAssignmentGenerator.getReplicaGroupPartitionAssignment(tableNameWithType);
     Assert.assertTrue(partitionAssignment != null);
 
     // Check that the replica group partition assignment is deleted
-    _pinotHelixResourceManager.deleteOfflineTable(rawTableName);
+    _helixResourceManager.deleteOfflineTable(rawTableName);
     partitionAssignment = _partitionAssignmentGenerator.getReplicaGroupPartitionAssignment(tableNameWithType);
     Assert.assertTrue(partitionAssignment == null);
   }
@@ -268,17 +254,17 @@ public class SegmentAssignmentStrategyTest {
     tableConfig.getValidationConfig().setReplicaGroupStrategyConfig(replicaGroupStrategyConfig);
 
     // Create the table
-    _pinotHelixResourceManager.addTable(tableConfig);
+    _helixResourceManager.addTable(tableConfig);
 
     // Wait for table addition
-    while (!_pinotHelixResourceManager.hasOfflineTable(TABLE_NAME_TABLE_LEVEL_REPLICA_GROUP)) {
+    while (!_helixResourceManager.hasOfflineTable(TABLE_NAME_TABLE_LEVEL_REPLICA_GROUP)) {
       Thread.sleep(100);
     }
 
     // Upload segments
     Map<Integer, Set<String>> segmentsPerPartition = ReplicaGroupTestUtils
         .uploadMultipleSegmentsWithPartitionNumber(TABLE_NAME_TABLE_LEVEL_REPLICA_GROUP, 20, null,
-            _pinotHelixResourceManager, 1);
+            _helixResourceManager, 1);
 
     // Wait for all segments appear in the external view
     while (!allSegmentsPushedToIdealState(TABLE_NAME_TABLE_LEVEL_REPLICA_GROUP, 20)) {
@@ -290,7 +276,7 @@ public class SegmentAssignmentStrategyTest {
     ReplicaGroupPartitionAssignment partitionAssignment =
         _partitionAssignmentGenerator.getReplicaGroupPartitionAssignment(offlineTableName);
 
-    IdealState idealState = _helixAdmin.getResourceIdealState(HELIX_CLUSTER_NAME,
+    IdealState idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(),
         TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME_TABLE_LEVEL_REPLICA_GROUP));
 
     // Validate the segment assignment
@@ -328,17 +314,17 @@ public class SegmentAssignmentStrategyTest {
     tableConfig.setIndexingConfig(indexingConfig);
 
     // Add table
-    _pinotHelixResourceManager.addTable(tableConfig);
+    _helixResourceManager.addTable(tableConfig);
 
     // Wait for table addition
-    while (!_pinotHelixResourceManager.hasOfflineTable(TABLE_NAME_PARTITION_LEVEL_REPLICA_GROUP)) {
+    while (!_helixResourceManager.hasOfflineTable(TABLE_NAME_PARTITION_LEVEL_REPLICA_GROUP)) {
       Thread.sleep(100);
     }
 
     // Upload segments
     Map<Integer, Set<String>> segmentsPerPartition = ReplicaGroupTestUtils
         .uploadMultipleSegmentsWithPartitionNumber(TABLE_NAME_PARTITION_LEVEL_REPLICA_GROUP, numSegments,
-            PARTITION_COLUMN, _pinotHelixResourceManager, totalPartitionNumber);
+            PARTITION_COLUMN, _helixResourceManager, totalPartitionNumber);
 
     // Wait for all segments appear in the external view
     while (!allSegmentsPushedToIdealState(TABLE_NAME_PARTITION_LEVEL_REPLICA_GROUP, numSegments)) {
@@ -350,7 +336,7 @@ public class SegmentAssignmentStrategyTest {
     ReplicaGroupPartitionAssignment partitionAssignment =
         _partitionAssignmentGenerator.getReplicaGroupPartitionAssignment(offlineTable);
 
-    IdealState idealState = _helixAdmin.getResourceIdealState(HELIX_CLUSTER_NAME,
+    IdealState idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(),
         TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME_PARTITION_LEVEL_REPLICA_GROUP));
 
     // Validate the segment assignment
@@ -361,7 +347,7 @@ public class SegmentAssignmentStrategyTest {
 
   private boolean allSegmentsPushedToIdealState(String tableName, int segmentNum) {
     IdealState idealState =
-        _helixAdmin.getResourceIdealState(HELIX_CLUSTER_NAME, TableNameBuilder.OFFLINE.tableNameWithType(tableName));
+        _helixAdmin.getResourceIdealState(getHelixClusterName(), TableNameBuilder.OFFLINE.tableNameWithType(tableName));
     return idealState != null && idealState.getPartitionSet() != null
         && idealState.getPartitionSet().size() == segmentNum;
   }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
index 27b124f..e039d73 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
@@ -36,8 +36,7 @@ import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
-import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.helix.core.util.HelixSetupUtils;
+import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
 import org.joda.time.DateTime;
 import org.joda.time.Duration;
@@ -54,18 +53,14 @@ import static org.testng.Assert.assertEquals;
 /**
  * Tests for the ValidationManagers.
  */
-public class ValidationManagerTest {
-  private String HELIX_CLUSTER_NAME = "TestValidationManager";
-
+public class ValidationManagerTest extends ControllerTest {
   private static final String ZK_STR = ZkStarter.DEFAULT_ZK_STR;
-  private static final String CONTROLLER_INSTANCE_NAME = "localhost_11984";
   private static final String TEST_TABLE_NAME = "testTable";
   private static final String TEST_TABLE_TWO = "testTable2";
   private static final String TEST_SEGMENT_NAME = "testSegment";
 
   private ZkClient _zkClient;
 
-  private PinotHelixResourceManager _pinotHelixResourceManager;
   private ZkStarter.ZookeeperInstance _zookeeperInstance;
   private TableConfig _offlineTableConfig;
   private HelixManager _helixManager;
@@ -77,22 +72,16 @@ public class ValidationManagerTest {
     _zkClient = new ZkClient(ZK_STR);
     Thread.sleep(1000);
 
-    _pinotHelixResourceManager =
-        new PinotHelixResourceManager(ZK_STR, HELIX_CLUSTER_NAME, CONTROLLER_INSTANCE_NAME, null, 1000L,
-            true, /*isUpdateStateModel=*/
-            false, false);
-    _pinotHelixResourceManager.start();
+    startController();
 
-    ControllerRequestBuilderUtil.addFakeDataInstancesToAutoJoinHelixCluster(HELIX_CLUSTER_NAME, ZK_STR, 2, true);
-    ControllerRequestBuilderUtil.addFakeBrokerInstancesToAutoJoinHelixCluster(HELIX_CLUSTER_NAME, ZK_STR, 2, true);
+    ControllerRequestBuilderUtil.addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(), ZK_STR, 2, true);
+    ControllerRequestBuilderUtil.addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(), ZK_STR, 2, true);
 
     _offlineTableConfig =
         new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(TEST_TABLE_NAME).setNumReplicas(2)
             .build();
-
-    final String instanceId = "localhost_helixController";
-    _helixManager = HelixSetupUtils.setup(HELIX_CLUSTER_NAME, ZK_STR, instanceId, /*isUpdateStateModel=*/false, true);
-    _pinotHelixResourceManager.addTable(_offlineTableConfig);
+    _helixManager = _helixResourceManager.getHelixZkManager();
+    _helixResourceManager.addTable(_offlineTableConfig);
   }
 
   @Test
@@ -102,16 +91,16 @@ public class ValidationManagerTest {
     String partitionName = _offlineTableConfig.getTableName();
     HelixAdmin helixAdmin = _helixManager.getClusterManagmentTool();
 
-    IdealState idealState = HelixHelper.getBrokerIdealStates(helixAdmin, HELIX_CLUSTER_NAME);
+    IdealState idealState = HelixHelper.getBrokerIdealStates(helixAdmin, getHelixClusterName());
     // Ensure that the broker resource is not rebuilt.
     Assert.assertTrue(idealState.getInstanceSet(partitionName)
-        .equals(_pinotHelixResourceManager.getAllInstancesForBrokerTenant(TagNameUtils.DEFAULT_TENANT_NAME)));
-    _pinotHelixResourceManager.rebuildBrokerResourceFromHelixTags(partitionName);
+        .equals(_helixResourceManager.getAllInstancesForBrokerTenant(TagNameUtils.DEFAULT_TENANT_NAME)));
+    _helixResourceManager.rebuildBrokerResourceFromHelixTags(partitionName);
 
     // Add another table that needs to be rebuilt
     TableConfig offlineTableConfigTwo =
         new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(TEST_TABLE_TWO).build();
-    _pinotHelixResourceManager.addTable(offlineTableConfigTwo);
+    _helixResourceManager.addTable(offlineTableConfigTwo);
     String partitionNameTwo = offlineTableConfigTwo.getTableName();
 
     // Add a new broker manually such that the ideal state is not updated and ensure that rebuild broker resource is called
@@ -120,18 +109,18 @@ public class ValidationManagerTest {
     instanceConfig.setInstanceEnabled(true);
     instanceConfig.setHostName("Broker_localhost");
     instanceConfig.setPort("2");
-    helixAdmin.addInstance(HELIX_CLUSTER_NAME, instanceConfig);
-    helixAdmin.addInstanceTag(HELIX_CLUSTER_NAME, instanceConfig.getInstanceName(),
+    helixAdmin.addInstance(getHelixClusterName(), instanceConfig);
+    helixAdmin.addInstanceTag(getHelixClusterName(), instanceConfig.getInstanceName(),
         TagNameUtils.getBrokerTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME));
-    idealState = HelixHelper.getBrokerIdealStates(helixAdmin, HELIX_CLUSTER_NAME);
+    idealState = HelixHelper.getBrokerIdealStates(helixAdmin, getHelixClusterName());
     // Assert that the two don't equal before the call to rebuild the broker resource.
     Assert.assertTrue(!idealState.getInstanceSet(partitionNameTwo)
-        .equals(_pinotHelixResourceManager.getAllInstancesForBrokerTenant(TagNameUtils.DEFAULT_TENANT_NAME)));
-    _pinotHelixResourceManager.rebuildBrokerResourceFromHelixTags(partitionNameTwo);
-    idealState = HelixHelper.getBrokerIdealStates(helixAdmin, HELIX_CLUSTER_NAME);
+        .equals(_helixResourceManager.getAllInstancesForBrokerTenant(TagNameUtils.DEFAULT_TENANT_NAME)));
+    _helixResourceManager.rebuildBrokerResourceFromHelixTags(partitionNameTwo);
+    idealState = HelixHelper.getBrokerIdealStates(helixAdmin, getHelixClusterName());
     // Assert that the two do equal after being rebuilt.
     Assert.assertTrue(idealState.getInstanceSet(partitionNameTwo)
-        .equals(_pinotHelixResourceManager.getAllInstancesForBrokerTenant(TagNameUtils.DEFAULT_TENANT_NAME)));
+        .equals(_helixResourceManager.getAllInstancesForBrokerTenant(TagNameUtils.DEFAULT_TENANT_NAME)));
   }
 
   @Test
@@ -139,9 +128,9 @@ public class ValidationManagerTest {
       throws Exception {
     SegmentMetadata segmentMetadata = SegmentMetadataMockUtils.mockSegmentMetadata(TEST_TABLE_NAME, TEST_SEGMENT_NAME);
 
-    _pinotHelixResourceManager.addNewSegment(segmentMetadata, "http://dummy/");
+    _helixResourceManager.addNewSegment(segmentMetadata, "http://dummy/");
     OfflineSegmentZKMetadata offlineSegmentZKMetadata =
-        _pinotHelixResourceManager.getOfflineSegmentZKMetadata(TEST_TABLE_NAME, TEST_SEGMENT_NAME);
+        _helixResourceManager.getOfflineSegmentZKMetadata(TEST_TABLE_NAME, TEST_SEGMENT_NAME);
     long pushTime = offlineSegmentZKMetadata.getPushTime();
     // Check that the segment has been pushed in the last 30 seconds
     Assert.assertTrue(System.currentTimeMillis() - pushTime < 30_000);
@@ -150,10 +139,10 @@ public class ValidationManagerTest {
 
     // Refresh the segment
     Mockito.when(segmentMetadata.getCrc()).thenReturn(Long.toString(System.nanoTime()));
-    _pinotHelixResourceManager.refreshSegment(segmentMetadata, offlineSegmentZKMetadata);
+    _helixResourceManager.refreshSegment(segmentMetadata, offlineSegmentZKMetadata);
 
     offlineSegmentZKMetadata =
-        _pinotHelixResourceManager.getOfflineSegmentZKMetadata(TEST_TABLE_NAME, TEST_SEGMENT_NAME);
+        _helixResourceManager.getOfflineSegmentZKMetadata(TEST_TABLE_NAME, TEST_SEGMENT_NAME);
     // Check that the segment still has the same push time
     assertEquals(offlineSegmentZKMetadata.getPushTime(), pushTime);
     // Check that the refresh time is in the last 30 seconds
@@ -199,7 +188,7 @@ public class ValidationManagerTest {
 
   @AfterClass
   public void shutDown() {
-    _pinotHelixResourceManager.stop();
+    _helixResourceManager.stop();
     _zkClient.close();
     ZkStarter.stopLocalZkServer(_zookeeperInstance);
   }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
index 1efeeeb..e8cdb68 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
@@ -171,8 +171,8 @@ public class ConvertToRawIndexMinionClusterIntegrationTest extends HybridCluster
   @Test
   public void testPinotHelixResourceManagerAPIs() {
     // Instance APIs
-    Assert.assertEquals(_helixResourceManager.getAllInstances().size(), 6);
-    Assert.assertEquals(_helixResourceManager.getOnlineInstanceList().size(), 6);
+    Assert.assertEquals(_helixResourceManager.getAllInstances().size(), 7);
+    Assert.assertEquals(_helixResourceManager.getOnlineInstanceList().size(), 7);
     Assert.assertEquals(_helixResourceManager.getOnlineUnTaggedBrokerInstanceList().size(), 0);
     Assert.assertEquals(_helixResourceManager.getOnlineUnTaggedServerInstanceList().size(), 0);
 
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 172b2fa..85c8b15 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -516,7 +516,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     JsonNode response = JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceList()));
     JsonNode instanceList = response.get("instances");
     int numInstances = instanceList.size();
-    assertEquals(numInstances, getNumBrokers() + getNumServers());
+    // The total number of instances is equal to the sum of num brokers, num servers and 1 controller.
+    assertEquals(numInstances, getNumBrokers() + getNumServers() + 1);
 
     // Try to delete a server that does not exist
     String deleteInstanceRequest = _controllerRequestURLBuilder.forInstanceDelete("potato");
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartControllerCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartControllerCommand.java
index db56895..45d36a2 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartControllerCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartControllerCommand.java
@@ -51,7 +51,10 @@ public class StartControllerCommand extends AbstractBaseAdminCommand implements
   @Option(name = "-clusterName", required = false, metaVar = "<String>", usage = "Pinot cluster name.")
   private String _clusterName = DEFAULT_CLUSTER_NAME;
 
-  @Option(name = "-configFileName", required = false, metaVar = "<FilePathName>", usage = "Controller Starter config file", forbids = {"-controllerHost", "-controllerPort", "-dataDir", "-zkAddress", "-clusterName"})
+  @Option(name = "-controllerMode", required = false, metaVar = "<String>", usage = "Pinot controller mode.")
+  private ControllerConf.ControllerMode _controllerMode = ControllerConf.ControllerMode.DUAL;
+
+  @Option(name = "-configFileName", required = false, metaVar = "<FilePathName>", usage = "Controller Starter config file", forbids = {"-controllerHost", "-controllerPort", "-dataDir", "-zkAddress", "-clusterName", "-controllerMode"})
   private String _configFileName;
 
   @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
@@ -150,6 +153,8 @@ public class StartControllerCommand extends AbstractBaseAdminCommand implements
         conf.setOfflineSegmentIntervalCheckerFrequencyInSeconds(3600);
         conf.setRealtimeSegmentValidationFrequencyInSeconds(3600);
         conf.setBrokerResourceValidationFrequencyInSeconds(3600);
+
+        conf.setControllerMode(_controllerMode);
       }
 
       LOGGER.info("Executing command: " + toString());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org