You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/06/07 05:18:22 UTC

[incubator-pinot] branch master updated: Create leadControllerResource in helix cluster (#4047)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7566790  Create leadControllerResource in helix cluster (#4047)
7566790 is described below

commit 75667900a7ad4739069aa2220b778ab7d7b12a6b
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Thu Jun 6 22:18:16 2019 -0700

    Create leadControllerResource in helix cluster (#4047)
    
    * Create leadControllerResource in Helix cluster
    
    * Separate Helix cluster creation logic
    
    * Enabled delay rebalance on lead controller resource, set delay time to 5 mins, minActiveReplicas to 0, numReplicas to 1.
---
 .../apache/pinot/common/config/TagNameUtils.java   |  30 +--
 .../apache/pinot/common/utils/CommonConstants.java |  10 +
 .../apache/pinot/controller/ControllerStarter.java |   6 +-
 .../helix/core/PinotHelixResourceManager.java      |  76 +++---
 .../helix/core/util/HelixSetupUtils.java           | 286 ++++++++++++++-------
 .../controller/helix/PinotControllerModeTest.java  | 113 +++++++-
 .../helix/core/PinotHelixResourceManagerTest.java  | 180 ++++++++++---
 7 files changed, 498 insertions(+), 203 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/config/TagNameUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/config/TagNameUtils.java
index f1ab25e..a22e4fa 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/config/TagNameUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/config/TagNameUtils.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.common.config;
 
-import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.utils.ServerType;
 import org.apache.pinot.common.utils.TenantRole;
 
@@ -38,25 +37,20 @@ public class TagNameUtils {
     return tenantName + "_" + TenantRole.BROKER.toString();
   }
 
-  public static boolean hasValidServerTagSuffix(String tagName) {
-    if (tagName.endsWith(ServerType.REALTIME.toString()) || tagName.endsWith(ServerType.OFFLINE.toString())) {
-      return true;
-    }
-    return false;
+  public static boolean isServerTag(String tagName) {
+    return isOfflineServerTag(tagName) || isRealtimeServerTag(tagName);
   }
 
-  public static TenantRole getTenantRoleFromTag(String tagName)
-      throws InvalidConfigException {
-    if (tagName.endsWith(ServerType.REALTIME.toString())) {
-      return TenantRole.SERVER;
-    }
-    if (tagName.endsWith(ServerType.OFFLINE.toString())) {
-      return TenantRole.SERVER;
-    }
-    if (tagName.endsWith(TenantRole.BROKER.toString())) {
-      return TenantRole.BROKER;
-    }
-    throw new InvalidConfigException("Cannot identify tenant type from tag name : " + tagName);
+  public static boolean isOfflineServerTag(String tagName) {
+    return tagName.endsWith(ServerType.OFFLINE.toString());
+  }
+
+  public static boolean isRealtimeServerTag(String tagName) {
+    return tagName.endsWith(ServerType.REALTIME.toString());
+  }
+
+  public static boolean isBrokerTag(String tagName) {
+    return tagName.endsWith(TenantRole.BROKER.toString());
   }
 
   public static String getTagFromTenantAndServerType(String tenantName, ServerType type) {
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 915e96e..8132cdb 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -37,8 +37,18 @@ public class CommonConstants {
 
     public static final String SERVER_INSTANCE_TYPE = "server";
     public static final String BROKER_INSTANCE_TYPE = "broker";
+    public static final String CONTROLLER_INSTANCE_TYPE = "controller";
 
     public static final String BROKER_RESOURCE_INSTANCE = "brokerResource";
+    public static final String LEAD_CONTROLLER_RESOURCE_NAME = "leadControllerResource";
+
+    // More information on why these numbers are set can be found in the following doc:
+    // https://cwiki.apache.org/confluence/display/PINOT/Controller+Separation+between+Helix+and+Pinot
+    public static final int NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE = 24;
+    public static final int LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT = 1;
+    public static final boolean ENABLE_DELAY_REBALANCE = true;
+    public static final int MIN_ACTIVE_REPLICAS = 0;
+    public static final long REBALANCE_DELAY_MS = 300_000L; // 5 minutes.
 
     public static final String UNTAGGED_SERVER_INSTANCE = "server_untagged";
     public static final String UNTAGGED_BROKER_INSTANCE = "broker_untagged";
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index f57d21f..c313616 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -219,8 +219,7 @@ public class ControllerStarter {
   private void setUpHelixController() {
     // Register and connect instance as Helix controller.
     LOGGER.info("Starting Helix controller");
-    _helixControllerManager = HelixSetupUtils
-        .setup(_helixClusterName, _helixZkURL, _instanceId, _isUpdateStateModel, _enableBatchMessageMode);
+    _helixControllerManager = HelixSetupUtils.setup(_helixClusterName, _helixZkURL, _instanceId);
 
     // Emit helix controller metrics
     _controllerMetrics.addCallbackGauge(CommonConstants.Helix.INSTANCE_CONNECTED_METRIC_NAME,
@@ -244,6 +243,9 @@ public class ControllerStarter {
       throw new RuntimeException("Pinot only controller currently isn't supported in production yet.");
     }
 
+    // Set up Pinot cluster in Helix
+    HelixSetupUtils.setupPinotCluster(_helixClusterName, _helixZkURL, _isUpdateStateModel, _enableBatchMessageMode);
+
     // Start all components
     initPinotFSFactory();
     initSegmentFetcherFactory();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index aaa9ea8..877458d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -53,6 +53,7 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
 import org.apache.helix.model.CurrentState;
@@ -60,6 +61,7 @@ import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.MasterSlaveSMD;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.config.IndexingConfig;
 import org.apache.pinot.common.config.OfflineTagConfig;
@@ -92,7 +94,6 @@ import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.BrokerOnli
 import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel;
 import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
 import org.apache.pinot.common.utils.SchemaUtils;
-import org.apache.pinot.common.utils.TenantRole;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.common.utils.helix.PinotHelixPropertyStoreZnRecordProvider;
 import org.apache.pinot.common.utils.retry.RetryPolicies;
@@ -157,15 +158,10 @@ public class PinotHelixResourceManager {
     _allowHLCTables = allowHLCTables;
   }
 
-  public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String helixClusterName,
-      @Nonnull String controllerInstanceId, @Nonnull String dataDir) {
-    this(zkURL, helixClusterName, controllerInstanceId, dataDir, DEFAULT_EXTERNAL_VIEW_UPDATE_TIMEOUT_MILLIS, false,
-        true, true);
-  }
-
   public PinotHelixResourceManager(@Nonnull ControllerConf controllerConf) {
     this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(),
-        controllerConf.getControllerHost() + "_" + controllerConf.getControllerPort(), controllerConf.getDataDir(),
+        CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + controllerConf.getControllerHost() + "_"
+            + controllerConf.getControllerPort(), controllerConf.getDataDir(),
         controllerConf.getExternalViewOnlineToOfflineTimeout(), controllerConf.tenantIsolationEnabled(),
         controllerConf.getEnableBatchMessageMode(), controllerConf.getHLCTablesAllowed());
   }
@@ -185,6 +181,10 @@ public class PinotHelixResourceManager {
     _cacheInstanceConfigsDataAccessor =
         new ZkCacheBaseDataAccessor<>((ZkBaseDataAccessor<ZNRecord>) baseDataAccessor, instanceConfigs, null,
             Collections.singletonList(instanceConfigs));
+
+    // Add instance group tag for controller
+    addInstanceGroupTagIfNeeded();
+
     _keyBuilder = _helixDataAccessor.keyBuilder();
     _segmentDeletionManager = new SegmentDeletionManager(_dataDir, _helixAdmin, _helixClusterName, _propertyStore);
     ZKMetadataProvider.setClusterTenantIsolationEnabled(_propertyStore, _isSingleTenantCluster);
@@ -259,9 +259,13 @@ public class PinotHelixResourceManager {
    * Register and connect to Helix cluster as PARTICIPANT role.
    */
   private HelixManager registerAndConnectAsHelixParticipant() {
-    HelixManager helixManager = HelixManagerFactory
-        .getZKHelixManager(_helixClusterName, CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + _instanceId,
-            InstanceType.PARTICIPANT, _helixZkURL);
+    HelixManager helixManager =
+        HelixManagerFactory.getZKHelixManager(_helixClusterName, _instanceId, InstanceType.PARTICIPANT, _helixZkURL);
+
+    // Registers Master-Slave state model to state machine engine, which is for calculating participant assignment in lead controller resource.
+    helixManager.getStateMachineEngine()
+        .registerStateModelFactory(MasterSlaveSMD.name, new MasterSlaveStateModelFactory());
+
     try {
       helixManager.connect();
       return helixManager;
@@ -274,6 +278,20 @@ public class PinotHelixResourceManager {
   }
 
   /**
+   * Add instance group tag for controller so that pinot controller can be assigned to lead controller resource.
+   */
+  private void addInstanceGroupTagIfNeeded() {
+    InstanceConfig instanceConfig = getHelixInstanceConfig(_instanceId);
+    if (!instanceConfig.containsTag(CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE)) {
+      LOGGER.info("Controller: {} doesn't contain group tag: {}. Adding one.", _instanceId,
+          CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE);
+      instanceConfig.addTag(CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE);
+      HelixDataAccessor accessor = _helixZkManager.getHelixDataAccessor();
+      accessor.setProperty(accessor.keyBuilder().instanceConfig(_instanceId), instanceConfig);
+    }
+  }
+
+  /**
    * Instance related APIs
    */
 
@@ -835,20 +853,8 @@ public class PinotHelixResourceManager {
     for (String instanceName : instancesInCluster) {
       InstanceConfig config = _helixDataAccessor.getProperty(_keyBuilder.instanceConfig(instanceName));
       for (String tag : config.getTags()) {
-        if (tag.equals(CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE) || tag
-            .equals(CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE) || tag
-            .equals(CommonConstants.Minion.UNTAGGED_INSTANCE)) {
-          continue;
-        }
-        TenantRole tenantRole;
-        try {
-          tenantRole = TagNameUtils.getTenantRoleFromTag(tag);
-          if (tenantRole == TenantRole.BROKER) {
-            tenantSet.add(TagNameUtils.getTenantNameFromTag(tag));
-          }
-        } catch (InvalidConfigException e) {
-          LOGGER.warn("Instance {} contains an invalid tag: {}", instanceName, tag);
-          continue;
+        if (TagNameUtils.isBrokerTag(tag)) {
+          tenantSet.add(TagNameUtils.getTenantNameFromTag(tag));
         }
       }
     }
@@ -861,20 +867,8 @@ public class PinotHelixResourceManager {
     for (String instanceName : instancesInCluster) {
       InstanceConfig config = _helixDataAccessor.getProperty(_keyBuilder.instanceConfig(instanceName));
       for (String tag : config.getTags()) {
-        if (tag.equals(CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE) || tag
-            .equals(CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE) || tag
-            .equals(CommonConstants.Minion.UNTAGGED_INSTANCE)) {
-          continue;
-        }
-        TenantRole tenantRole;
-        try {
-          tenantRole = TagNameUtils.getTenantRoleFromTag(tag);
-          if (tenantRole == TenantRole.SERVER) {
-            tenantSet.add(TagNameUtils.getTenantNameFromTag(tag));
-          }
-        } catch (InvalidConfigException e) {
-          LOGGER.warn("Instance {} contains an invalid tag: {}", instanceName, tag);
-          continue;
+        if (TagNameUtils.isServerTag(tag)) {
+          tenantSet.add(TagNameUtils.getTenantNameFromTag(tag));
         }
       }
     }
@@ -1176,7 +1170,7 @@ public class PinotHelixResourceManager {
     if (tagOverrideConfig != null) {
       String realtimeConsumingTag = tagOverrideConfig.getRealtimeConsuming();
       if (realtimeConsumingTag != null) {
-        if (!TagNameUtils.hasValidServerTagSuffix(realtimeConsumingTag)) {
+        if (!TagNameUtils.isServerTag(realtimeConsumingTag)) {
           throw new InvalidTableConfigException(
               "Invalid realtime consuming tag: " + realtimeConsumingTag + " for table " + tableNameWithType
                   + ". Must have suffix _REALTIME or _OFFLINE");
@@ -1190,7 +1184,7 @@ public class PinotHelixResourceManager {
 
       String realtimeCompletedTag = tagOverrideConfig.getRealtimeCompleted();
       if (realtimeCompletedTag != null) {
-        if (!TagNameUtils.hasValidServerTagSuffix(realtimeCompletedTag)) {
+        if (!TagNameUtils.isServerTag(realtimeCompletedTag)) {
           throw new InvalidTableConfigException(
               "Invalid realtime completed tag: " + realtimeCompletedTag + " for table " + tableNameWithType
                   + ". Must have suffix _REALTIME or _OFFLINE");
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
index b8707f7..f65a8ce 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pinot.controller.helix.core.util;
 
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -27,10 +29,10 @@ import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyPathConfig;
-import org.apache.helix.PropertyType;
+import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZKHelixManager;
@@ -41,6 +43,7 @@ import org.apache.helix.messaging.handling.HelixTaskExecutor;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.MasterSlaveSMD;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
@@ -53,6 +56,11 @@ import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.common.utils.CommonConstants.Helix.ENABLE_DELAY_REBALANCE;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.MIN_ACTIVE_REPLICAS;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.REBALANCE_DELAY_MS;
+
 
 /**
  * HelixSetupUtils handles how to create or get a helixCluster in controller.
@@ -60,119 +68,205 @@ import org.slf4j.LoggerFactory;
  *
  */
 public class HelixSetupUtils {
-
   private static final Logger LOGGER = LoggerFactory.getLogger(HelixSetupUtils.class);
 
   public static synchronized HelixManager setup(String helixClusterName, String zkPath,
-      String pinotControllerInstanceId, boolean isUpdateStateModel, boolean enableBatchMessageMode) {
+      String helixControllerInstanceId) {
+    setupHelixCluster(helixClusterName, zkPath);
 
-    try {
-      createHelixClusterIfNeeded(helixClusterName, zkPath, isUpdateStateModel, enableBatchMessageMode);
-    } catch (final Exception e) {
-      LOGGER.error("Caught exception", e);
-      return null;
-    }
+    return startHelixControllerInStandadloneMode(helixClusterName, zkPath, helixControllerInstanceId);
+  }
 
-    try {
-      return startHelixControllerInStandadloneMode(helixClusterName, zkPath, pinotControllerInstanceId);
-    } catch (final Exception e) {
-      LOGGER.error("Caught exception", e);
-      return null;
+  /**
+   * Set up a brand new Helix cluster if it doesn't exist.
+   */
+  public static void setupHelixCluster(String helixClusterName, String zkPath) {
+    final HelixAdmin admin = new ZKHelixAdmin(zkPath);
+    if (admin.getClusters().contains(helixClusterName)) {
+      LOGGER.info("Helix cluster: {} already exists", helixClusterName);
+      return;
     }
+    LOGGER.info("Creating a new Helix cluster: {}", helixClusterName);
+    admin.addCluster(helixClusterName, false);
+    LOGGER.info("New Cluster: {} created.", helixClusterName);
+  }
+
+  private static HelixManager startHelixControllerInStandadloneMode(String helixClusterName, String zkUrl,
+      String pinotControllerInstanceId) {
+    LOGGER.info("Starting Helix Standalone Controller ... ");
+    return HelixControllerMain
+        .startHelixController(zkUrl, helixClusterName, pinotControllerInstanceId, HelixControllerMain.STANDALONE);
   }
 
-  public static void createHelixClusterIfNeeded(String helixClusterName, String zkPath, boolean isUpdateStateModel,
+  /**
+   * Customizes existing Helix cluster to run Pinot components.
+   */
+  public static void setupPinotCluster(String helixClusterName, String zkPath, boolean isUpdateStateModel,
       boolean enableBatchMessageMode) {
     final HelixAdmin admin = new ZKHelixAdmin(zkPath);
-    final String segmentStateModelName =
-        PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+    Preconditions.checkState(admin.getClusters().contains(helixClusterName),
+        String.format("Helix cluster: %s hasn't been set up", helixClusterName));
 
-    if (admin.getClusters().contains(helixClusterName)) {
-      LOGGER.info("cluster already exists ********************************************* ");
-      if (isUpdateStateModel) {
-        final StateModelDefinition curStateModelDef = admin.getStateModelDef(helixClusterName, segmentStateModelName);
-        List<String> states = curStateModelDef.getStatesPriorityList();
-        if (states.contains(PinotHelixSegmentOnlineOfflineStateModelGenerator.CONSUMING_STATE)) {
-          LOGGER.info("State model {} already updated to contain CONSUMING state", segmentStateModelName);
-          return;
-        } else {
-          LOGGER.info("Updating {} to add states for low level consumers", segmentStateModelName);
-          StateModelDefinition newStateModelDef =
-              PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition();
-          ZkClient zkClient = new ZkClient(zkPath);
-          zkClient.waitUntilConnected(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC, TimeUnit.SECONDS);
-          zkClient.setZkSerializer(new ZNRecordSerializer());
-          HelixDataAccessor accessor =
-              new ZKHelixDataAccessor(helixClusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
-          PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-          accessor.setProperty(keyBuilder.stateModelDef(segmentStateModelName), newStateModelDef);
-          LOGGER.info("Completed updating statemodel {}", segmentStateModelName);
-          zkClient.close();
-        }
-      }
-      return;
-    }
+    // Ensure auto join.
+    ensureAutoJoin(helixClusterName, admin);
 
-    LOGGER.info("Creating a new cluster, as the helix cluster : " + helixClusterName
-        + " was not found ********************************************* ");
-    admin.addCluster(helixClusterName, false);
+    // Add segment state model definition if needed
+    addSegmentStateModelDefinitionIfNeeded(helixClusterName, admin, zkPath, isUpdateStateModel);
+
+    // Add broker resource if needed
+    createBrokerResourceIfNeeded(helixClusterName, admin, enableBatchMessageMode);
 
-    LOGGER.info("Enable auto join.");
+    // Add lead controller resource if needed
+    createLeadControllerResourceIfNeeded(helixClusterName, admin, enableBatchMessageMode);
+
+    // Init property store if needed
+    initPropertyStoreIfNeeded(helixClusterName, zkPath);
+  }
+
+  private static void ensureAutoJoin(String helixClusterName, HelixAdmin admin) {
     final HelixConfigScope scope =
         new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(helixClusterName).build();
+    String stateTransitionMaxThreads = MessageType.STATE_TRANSITION + "." + HelixTaskExecutor.MAX_THREADS;
+    List<String> keys = new ArrayList<>();
+    keys.add(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN);
+    keys.add(stateTransitionMaxThreads);
+    Map<String, String> configs = admin.getConfig(scope, keys);
+    if (!Boolean.TRUE.toString().equals(configs.get(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN))) {
+      configs.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, Boolean.TRUE.toString());
+    }
+    if (!Integer.toString(1).equals(configs.get(stateTransitionMaxThreads))) {
+      configs.put(stateTransitionMaxThreads, String.valueOf(1));
+    }
+    admin.setConfig(scope, configs);
+  }
 
-    final Map<String, String> props = new HashMap<String, String>();
-    props.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, String.valueOf(true));
-    //we need only one segment to be loaded at a time
-    props.put(MessageType.STATE_TRANSITION + "." + HelixTaskExecutor.MAX_THREADS, String.valueOf(1));
-
-    admin.setConfig(scope, props);
-
-    LOGGER.info(
-        "Adding state model {} (with CONSUMED state) generated using {} **********************************************",
-        segmentStateModelName, PinotHelixSegmentOnlineOfflineStateModelGenerator.class.toString());
-
-    // If this is a fresh cluster we are creating, then the cluster will see the CONSUMING state in the
-    // state model. But then the servers will never be asked to go to that STATE (whether they have the code
-    // to handle it or not) unil we complete the feature using low-level consumers and turn the feature on.
-    admin.addStateModelDef(helixClusterName, segmentStateModelName,
-        PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
-
-    LOGGER.info("Adding state model definition named : "
-        + PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL
-        + " generated using : " + PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.class.toString()
-        + " ********************************************** ");
-
-    admin.addStateModelDef(helixClusterName,
-        PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL,
-        PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
-
-    LOGGER.info("Adding empty ideal state for Broker!");
-    HelixHelper.updateResourceConfigsFor(new HashMap<String, String>(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE,
-        helixClusterName, admin);
-    IdealState idealState = PinotTableIdealStateBuilder
-        .buildEmptyIdealStateForBrokerResource(admin, helixClusterName, enableBatchMessageMode);
-    admin.setResourceIdealState(helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, idealState);
-    initPropertyStorePath(helixClusterName, zkPath);
-    LOGGER.info("New Cluster setup completed... ********************************************** ");
+  private static void addSegmentStateModelDefinitionIfNeeded(String helixClusterName, HelixAdmin admin, String zkPath,
+      boolean isUpdateStateModel) {
+    final String segmentStateModelName =
+        PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+    StateModelDefinition stateModelDefinition = admin.getStateModelDef(helixClusterName, segmentStateModelName);
+    if (stateModelDefinition == null) {
+      LOGGER.info("Adding state model {} (with CONSUMED state) generated using {}", segmentStateModelName,
+          PinotHelixSegmentOnlineOfflineStateModelGenerator.class.toString());
+      admin.addStateModelDef(helixClusterName, segmentStateModelName,
+          PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
+    } else if (isUpdateStateModel) {
+      final StateModelDefinition curStateModelDef = admin.getStateModelDef(helixClusterName, segmentStateModelName);
+      List<String> states = curStateModelDef.getStatesPriorityList();
+      if (states.contains(PinotHelixSegmentOnlineOfflineStateModelGenerator.CONSUMING_STATE)) {
+        LOGGER.info("State model {} already updated to contain CONSUMING state", segmentStateModelName);
+      } else {
+        LOGGER.info("Updating {} to add states for low level consumers", segmentStateModelName);
+        StateModelDefinition newStateModelDef =
+            PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition();
+        ZkClient zkClient = new ZkClient(zkPath);
+        zkClient.waitUntilConnected(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC, TimeUnit.SECONDS);
+        zkClient.setZkSerializer(new ZNRecordSerializer());
+        HelixDataAccessor accessor = new ZKHelixDataAccessor(helixClusterName, new ZkBaseDataAccessor<>(zkClient));
+        PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+        accessor.setProperty(keyBuilder.stateModelDef(segmentStateModelName), newStateModelDef);
+        LOGGER.info("Completed updating state model {}", segmentStateModelName);
+        zkClient.close();
+      }
+    }
   }
 
-  private static void initPropertyStorePath(String helixClusterName, String zkPath) {
-    String propertyStorePath = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, helixClusterName);
-    ZkHelixPropertyStore<ZNRecord> propertyStore =
-        new ZkHelixPropertyStore<ZNRecord>(zkPath, new ZNRecordSerializer(), propertyStorePath);
-    propertyStore.create("/CONFIGS", new ZNRecord(""), AccessOption.PERSISTENT);
-    propertyStore.create("/CONFIGS/CLUSTER", new ZNRecord(""), AccessOption.PERSISTENT);
-    propertyStore.create("/CONFIGS/TABLE", new ZNRecord(""), AccessOption.PERSISTENT);
-    propertyStore.create("/CONFIGS/INSTANCE", new ZNRecord(""), AccessOption.PERSISTENT);
-    propertyStore.create("/SCHEMAS", new ZNRecord(""), AccessOption.PERSISTENT);
-    propertyStore.create("/SEGMENTS", new ZNRecord(""), AccessOption.PERSISTENT);
+  private static void createBrokerResourceIfNeeded(String helixClusterName, HelixAdmin admin,
+      boolean enableBatchMessageMode) {
+    // Add broker resource online offline state model definition if needed
+    StateModelDefinition brokerResourceStateModelDefinition = admin.getStateModelDef(helixClusterName,
+        PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL);
+    if (brokerResourceStateModelDefinition == null) {
+      LOGGER.info("Adding state model definition named : {} generated using : {}",
+          PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL,
+          PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.class.toString());
+      admin.addStateModelDef(helixClusterName,
+          PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL,
+          PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
+    }
+
+    // Create broker resource if needed.
+    IdealState brokerResourceIdealState =
+        admin.getResourceIdealState(helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    if (brokerResourceIdealState == null) {
+      LOGGER.info("Adding empty ideal state for Broker!");
+      HelixHelper
+          .updateResourceConfigsFor(new HashMap<>(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, helixClusterName,
+              admin);
+      IdealState idealState = PinotTableIdealStateBuilder
+          .buildEmptyIdealStateForBrokerResource(admin, helixClusterName, enableBatchMessageMode);
+      admin.setResourceIdealState(helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, idealState);
+    }
   }
 
-  private static HelixManager startHelixControllerInStandadloneMode(String helixClusterName, String zkUrl,
-      String pinotControllerInstanceId) {
-    LOGGER.info("Starting Helix Standalone Controller ... ");
-    return HelixControllerMain
-        .startHelixController(zkUrl, helixClusterName, pinotControllerInstanceId, HelixControllerMain.STANDALONE);
+  private static void createLeadControllerResourceIfNeeded(String helixClusterName, HelixAdmin admin,
+      boolean enableBatchMessageMode) {
+    StateModelDefinition masterSlaveStateModelDefinition =
+        admin.getStateModelDef(helixClusterName, MasterSlaveSMD.name);
+    if (masterSlaveStateModelDefinition == null) {
+      LOGGER.info("Adding state model definition named : {} generated using : {}", MasterSlaveSMD.name,
+          MasterSlaveSMD.class.toString());
+      admin.addStateModelDef(helixClusterName, MasterSlaveSMD.name, MasterSlaveSMD.build());
+    }
+
+    IdealState leadControllerResourceIdealState =
+        admin.getResourceIdealState(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME);
+    if (leadControllerResourceIdealState == null) {
+      LOGGER.info("Cluster {} doesn't contain {}. Creating one.", helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME);
+      HelixHelper.updateResourceConfigsFor(new HashMap<>(), LEAD_CONTROLLER_RESOURCE_NAME, helixClusterName, admin);
+      // FULL-AUTO Master-Slave state model with CrushED reBalance strategy.
+      admin.addResource(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME,
+          CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE, MasterSlaveSMD.name,
+          IdealState.RebalanceMode.FULL_AUTO.toString(), CrushEdRebalanceStrategy.class.getName());
+
+      // Set instance group tag for lead controller resource.
+      IdealState leadControllerIdealState =
+          admin.getResourceIdealState(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME);
+      leadControllerIdealState.setInstanceGroupTag(CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE);
+      leadControllerIdealState.setBatchMessageMode(enableBatchMessageMode);
+      // The below config guarantees if active number of replicas is no less than minimum active replica, there will not be partition movements happened.
+      // Set min active replicas to 0 and rebalance delay to 5 minutes so that if any master goes offline, Helix controller waits at most 5 minutes and then re-calculate the participant assignment.
+      // This delay is helpful when periodic tasks are running and we don't want them to be re-run too frequently.
+      // Plus, if virtual id is applied to controller hosts, swapping hosts would be easy as new hosts can use the same virtual id and it takes least effort to change the configs.
+      leadControllerIdealState.setMinActiveReplicas(MIN_ACTIVE_REPLICAS);
+      leadControllerIdealState.setRebalanceDelay(REBALANCE_DELAY_MS);
+      leadControllerIdealState.setDelayRebalanceEnabled(ENABLE_DELAY_REBALANCE);
+      admin.setResourceIdealState(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME, leadControllerIdealState);
+
+      // Explicitly disable this resource when creating this new resource.
+      // When all the controllers are running the code with the logic to handle this resource, it can be enabled for backward compatibility.
+      // In the next major release, we can enable this resource by default, so that all the controller logic can be separated.
+      admin.enableResource(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME, false);
+
+      LOGGER.info("Re-balance lead controller resource with replicas: {}",
+          CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT);
+      // Set it to 1 so that there's only 1 instance (i.e. master) shown in every partitions.
+      admin.rebalance(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME,
+          CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT);
+    }
+  }
+
+  private static void initPropertyStoreIfNeeded(String helixClusterName, String zkPath) {
+    String propertyStorePath = PropertyPathBuilder.propertyStore(helixClusterName);
+    ZkHelixPropertyStore<ZNRecord> propertyStore =
+        new ZkHelixPropertyStore<>(zkPath, new ZNRecordSerializer(), propertyStorePath);
+    if (!propertyStore.exists("/CONFIGS", AccessOption.PERSISTENT)) {
+      propertyStore.create("/CONFIGS", new ZNRecord(""), AccessOption.PERSISTENT);
+    }
+    if (!propertyStore.exists("/CONFIGS/CLUSTER", AccessOption.PERSISTENT)) {
+      propertyStore.create("/CONFIGS/CLUSTER", new ZNRecord(""), AccessOption.PERSISTENT);
+    }
+    if (!propertyStore.exists("/CONFIGS/TABLE", AccessOption.PERSISTENT)) {
+      propertyStore.create("/CONFIGS/TABLE", new ZNRecord(""), AccessOption.PERSISTENT);
+    }
+    if (!propertyStore.exists("/CONFIGS/INSTANCE", AccessOption.PERSISTENT)) {
+      propertyStore.create("/CONFIGS/INSTANCE", new ZNRecord(""), AccessOption.PERSISTENT);
+    }
+    if (!propertyStore.exists("/SCHEMAS", AccessOption.PERSISTENT)) {
+      propertyStore.create("/SCHEMAS", new ZNRecord(""), AccessOption.PERSISTENT);
+    }
+    if (!propertyStore.exists("/SEGMENTS", AccessOption.PERSISTENT)) {
+      propertyStore.create("/SEGMENTS", new ZNRecord(""), AccessOption.PERSISTENT);
+    }
   }
 }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
index f3da797..d91e612 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
@@ -18,9 +18,14 @@
  */
 package org.apache.pinot.controller.helix;
 
+import java.util.Map;
+import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.ControllerStarter;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -62,18 +67,37 @@ public class PinotControllerModeTest extends ControllerTest {
     config.setControllerMode(ControllerConf.ControllerMode.DUAL);
     config.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++));
 
+    // Helix cluster will be set up when starting the first controller.
     startController(config);
     TestUtils.waitForCondition(aVoid -> _helixManager.isConnected(), TIMEOUT_IN_MS,
         "Failed to start " + config.getControllerMode() + " controller in " + TIMEOUT_IN_MS + "ms.");
     Assert.assertEquals(_controllerStarter.getControllerMode(), ControllerConf.ControllerMode.DUAL);
 
+    // Enable the lead controller resource.
+    _helixAdmin.enableResource(getHelixClusterName(), CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, true);
+
+    // Starting a second dual-mode controller. Helix cluster has already been set up.
+    ControllerConf controllerConfig = getDefaultControllerConfiguration();
+    controllerConfig.setHelixClusterName(getHelixClusterName());
+    controllerConfig.setControllerMode(ControllerConf.ControllerMode.DUAL);
+    controllerConfig.setControllerPort(
+        Integer.toString(Integer.parseInt(this.config.getControllerPort()) + controllerPortOffset++));
+
+    ControllerStarter secondDualModeController = new TestOnlyControllerStarter(controllerConfig);
+    secondDualModeController.start();
+    TestUtils
+        .waitForCondition(aVoid -> secondDualModeController.getHelixResourceManager().getHelixZkManager().isConnected(),
+            TIMEOUT_IN_MS, "Failed to start " + config.getControllerMode() + " controller in " + TIMEOUT_IN_MS + "ms.");
+    Assert.assertEquals(secondDualModeController.getControllerMode(), ControllerConf.ControllerMode.DUAL);
+
+    secondDualModeController.stop();
     stopController();
     _controllerStarter = null;
   }
 
   // TODO: enable it after removing ControllerLeadershipManager which requires both CONTROLLER and PARTICIPANT
   //       HelixManager
-  @Test (enabled = false)
+  @Test(enabled = false)
   public void testPinotOnlyController()
       throws Exception {
     config.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY);
@@ -95,27 +119,94 @@ public class PinotControllerModeTest extends ControllerTest {
     ControllerStarter helixControllerStarter = new ControllerStarter(config2);
     helixControllerStarter.start();
     HelixManager helixControllerManager = helixControllerStarter.getHelixControllerManager();
+    HelixAdmin helixAdmin = helixControllerManager.getClusterManagmentTool();
     TestUtils.waitForCondition(aVoid -> helixControllerManager.isConnected(), TIMEOUT_IN_MS,
         "Failed to start " + config2.getControllerMode() + " controller in " + TIMEOUT_IN_MS + "ms.");
 
+    // Enable the lead controller resource.
+    helixAdmin.enableResource(getHelixClusterName(), CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, true);
+
     // Starting a pinot only controller.
-    startController(config, false);
-    TestUtils.waitForCondition(aVoid -> _helixResourceManager.getHelixZkManager().isConnected(), TIMEOUT_IN_MS,
-        "Failed to start " + config.getControllerMode() + " controller in " + TIMEOUT_IN_MS + "ms.");
-    Assert.assertEquals(_controllerStarter.getControllerMode(), ControllerConf.ControllerMode.PINOT_ONLY);
+    ControllerConf config3 = getDefaultControllerConfiguration();
+    config3.setHelixClusterName(getHelixClusterName());
+    config3.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY);
+    config3.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++));
+
+    ControllerStarter firstPinotOnlyController = new TestOnlyControllerStarter(config3);
+    firstPinotOnlyController.start();
+    PinotHelixResourceManager firstPinotOnlyPinotHelixResourceManager =
+        firstPinotOnlyController.getHelixResourceManager();
+
+    TestUtils.waitForCondition(aVoid -> firstPinotOnlyPinotHelixResourceManager.getHelixZkManager().isConnected(),
+        TIMEOUT_IN_MS, "Failed to start " + config.getControllerMode() + " controller in " + TIMEOUT_IN_MS + "ms.");
+    Assert.assertEquals(firstPinotOnlyController.getControllerMode(), ControllerConf.ControllerMode.PINOT_ONLY);
 
     // Start a second Pinot only controller.
-    config.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++));
-    ControllerStarter secondControllerStarter = new TestOnlyControllerStarter(config);
+    ControllerConf config4 = getDefaultControllerConfiguration();
+    config4.setHelixClusterName(getHelixClusterName());
+    config4.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY);
+    config4.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++));
 
+    ControllerStarter secondControllerStarter = new TestOnlyControllerStarter(config4);
     secondControllerStarter.start();
     // Two controller instances assigned to cluster.
-    TestUtils.waitForCondition(aVoid -> _helixResourceManager.getAllInstances().size() == 2, TIMEOUT_IN_MS,
-        "Failed to start the 2nd pinot only controller in " + TIMEOUT_IN_MS + "ms.");
-
+    TestUtils
+        .waitForCondition(aVoid -> firstPinotOnlyPinotHelixResourceManager.getAllInstances().size() == 2, TIMEOUT_IN_MS,
+            "Failed to start the 2nd pinot only controller in " + TIMEOUT_IN_MS + "ms.");
+
+    // Disable lead controller resource, all the participants are in offline state (from slave state).
+    helixAdmin.enableResource(getHelixClusterName(), CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, false);
+
+    TestUtils.waitForCondition(aVoid -> {
+      ExternalView leadControllerResourceExternalView = firstPinotOnlyPinotHelixResourceManager.getHelixAdmin()
+          .getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+      for (String partition : leadControllerResourceExternalView.getPartitionSet()) {
+        Map<String, String> stateMap = leadControllerResourceExternalView.getStateMap(partition);
+        for (Map.Entry<String, String> entry : stateMap.entrySet()) {
+          if (!"OFFLINE".equals(entry.getValue())) {
+            return false;
+          }
+        }
+      }
+      return true;
+    }, TIMEOUT_IN_MS, "Failed to mark all the participants offline in " + TIMEOUT_IN_MS + "ms.");
+
+    // Re-enable lead controller resource, all the participants are in healthy state (either master or slave).
+    helixAdmin.enableResource(getHelixClusterName(), CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, true);
+
+    // Shutdown one controller, it will be removed from external view of lead controller resource.
     secondControllerStarter.stop();
 
-    stopController();
+    TestUtils.waitForCondition(aVoid -> {
+      ExternalView leadControllerResourceExternalView = firstPinotOnlyPinotHelixResourceManager.getHelixAdmin()
+          .getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+      for (String partition : leadControllerResourceExternalView.getPartitionSet()) {
+        Map<String, String> stateMap = leadControllerResourceExternalView.getStateMap(partition);
+        // Only 1 participant left in each partition, which will become the master.
+        for (Map.Entry<String, String> entry : stateMap.entrySet()) {
+          if (!"MASTER".equals(entry.getValue())) {
+            return false;
+          }
+        }
+      }
+      return true;
+    }, TIMEOUT_IN_MS, "Failed to mark all the participants MASTER in " + TIMEOUT_IN_MS + "ms.");
+
+    // Shutdown the only one controller left, the partition map should be empty.
+    firstPinotOnlyController.stop();
+    TestUtils.waitForCondition(aVoid -> {
+      ExternalView leadControllerResourceExternalView = helixAdmin
+          .getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+      for (String partition : leadControllerResourceExternalView.getPartitionSet()) {
+        Map<String, String> stateMap = leadControllerResourceExternalView.getStateMap(partition);
+        // There's no participant in all the partitions.
+        if (!stateMap.isEmpty()) {
+          return false;
+        }
+      }
+      return true;
+    }, TIMEOUT_IN_MS, "Failed to have all the partitions empty in " + TIMEOUT_IN_MS + "ms.");
+
     _controllerStarter = null;
     helixControllerStarter.stop();
   }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index 573ab90..7d9f8d9 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -19,13 +19,23 @@
 package org.apache.pinot.controller.helix.core;
 
 import com.google.common.collect.BiMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
+import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.config.TableConfig;
@@ -45,12 +55,14 @@ import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
 import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.apache.pinot.common.utils.CommonConstants.Helix.*;
 import static org.apache.pinot.controller.helix.core.PinotHelixResourceManager.*;
 
 
@@ -64,34 +76,34 @@ public class PinotHelixResourceManagerTest extends ControllerTest {
   private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME);
   private static final int CONNECTION_TIMEOUT_IN_MILLISECOND = 10_000;
   private static final int MAX_TIMEOUT_IN_MILLISECOND = 5_000;
+  private static final int MAXIMUM_NUMBER_OF_CONTROLLER_INSTANCES = 10;
+  private static final long TIMEOUT_IN_MS = 10_000L;
 
   private final String _helixClusterName = getHelixClusterName();
 
   @BeforeClass
-  public void setUp()
-      throws Exception {
+  public void setUp() throws Exception {
     startZk();
     ControllerConf config = getDefaultControllerConfiguration();
     config.setTenantIsolationEnabled(false);
     startController(config);
 
-    ControllerRequestBuilderUtil
-        .addFakeBrokerInstancesToAutoJoinHelixCluster(_helixClusterName, ZkStarter.DEFAULT_ZK_STR, NUM_INSTANCES,
-            false);
-    ControllerRequestBuilderUtil
-        .addFakeDataInstancesToAutoJoinHelixCluster(_helixClusterName, ZkStarter.DEFAULT_ZK_STR, NUM_INSTANCES, false,
-            BASE_SERVER_ADMIN_PORT);
+    ControllerRequestBuilderUtil.addFakeBrokerInstancesToAutoJoinHelixCluster(_helixClusterName,
+        ZkStarter.DEFAULT_ZK_STR, NUM_INSTANCES, false);
+    ControllerRequestBuilderUtil.addFakeDataInstancesToAutoJoinHelixCluster(_helixClusterName, ZkStarter.DEFAULT_ZK_STR,
+        NUM_INSTANCES, false, BASE_SERVER_ADMIN_PORT);
 
     // Create server tenant on all Servers
-    Tenant serverTenant =
-        new Tenant.TenantBuilder(SERVER_TENANT_NAME).setRole(TenantRole.SERVER).setOfflineInstances(NUM_INSTANCES)
-            .build();
+    Tenant serverTenant = new Tenant.TenantBuilder(SERVER_TENANT_NAME).setRole(TenantRole.SERVER)
+        .setOfflineInstances(NUM_INSTANCES)
+        .build();
     _helixResourceManager.createServerTenant(serverTenant);
+
+    _helixAdmin.enableResource(getHelixClusterName(), LEAD_CONTROLLER_RESOURCE_NAME, true);
   }
 
   @Test
-  public void testGetInstanceEndpoints()
-      throws InvalidConfigException {
+  public void testGetInstanceEndpoints() throws InvalidConfigException {
     Set<String> servers = _helixResourceManager.getAllInstancesForServerTenant(SERVER_TENANT_NAME);
     BiMap<String, String> endpoints = _helixResourceManager.getDataInstanceAdminEndpoints(servers);
     for (int i = 0; i < NUM_INSTANCES; i++) {
@@ -100,8 +112,7 @@ public class PinotHelixResourceManagerTest extends ControllerTest {
   }
 
   @Test
-  public void testGetInstanceConfigs()
-      throws Exception {
+  public void testGetInstanceConfigs() throws Exception {
     Set<String> servers = _helixResourceManager.getAllInstancesForServerTenant(SERVER_TENANT_NAME);
     for (String server : servers) {
       InstanceConfig cachedInstanceConfig = _helixResourceManager.getHelixInstanceConfig(server);
@@ -118,8 +129,7 @@ public class PinotHelixResourceManagerTest extends ControllerTest {
     zkClient.close();
   }
 
-  private void modifyExistingInstanceConfig(ZkClient zkClient)
-      throws InterruptedException {
+  private void modifyExistingInstanceConfig(ZkClient zkClient) throws InterruptedException {
     String instanceName = "Server_localhost_" + new Random().nextInt(NUM_INSTANCES);
     String instanceConfigPath = PropertyPathBuilder.instanceConfig(_helixClusterName, instanceName);
     Assert.assertTrue(zkClient.exists(instanceConfigPath));
@@ -150,8 +160,7 @@ public class PinotHelixResourceManagerTest extends ControllerTest {
     zkClient.writeData(instanceConfigPath, znRecord);
   }
 
-  private void addAndRemoveNewInstanceConfig(ZkClient zkClient)
-      throws Exception {
+  private void addAndRemoveNewInstanceConfig(ZkClient zkClient) throws Exception {
     int biggerRandomNumber = NUM_INSTANCES + new Random().nextInt(NUM_INSTANCES);
     String instanceName = "Server_localhost_" + String.valueOf(biggerRandomNumber);
     String instanceConfigPath = PropertyPathBuilder.instanceConfig(_helixClusterName, instanceName);
@@ -184,17 +193,18 @@ public class PinotHelixResourceManagerTest extends ControllerTest {
   }
 
   @Test
-  public void testRebuildBrokerResourceFromHelixTags()
-      throws Exception {
+  public void testRebuildBrokerResourceFromHelixTags() throws Exception {
     // Create broker tenant on 3 Brokers
     Tenant brokerTenant =
         new Tenant.TenantBuilder(BROKER_TENANT_NAME).setRole(TenantRole.BROKER).setTotalInstances(3).build();
     _helixResourceManager.createBrokerTenant(brokerTenant);
 
     // Create the table
-    TableConfig tableConfig =
-        new TableConfig.Builder(TableType.OFFLINE).setTableName(TABLE_NAME).setNumReplicas(3)
-            .setBrokerTenant(BROKER_TENANT_NAME).setServerTenant(SERVER_TENANT_NAME).build();
+    TableConfig tableConfig = new TableConfig.Builder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        .setNumReplicas(3)
+        .setBrokerTenant(BROKER_TENANT_NAME)
+        .setServerTenant(SERVER_TENANT_NAME)
+        .build();
     _helixResourceManager.addTable(tableConfig);
 
     // Check that the BrokerResource ideal state has 3 Brokers assigned to the table
@@ -204,8 +214,8 @@ public class PinotHelixResourceManagerTest extends ControllerTest {
 
     // Untag all Brokers assigned to broker tenant
     for (String brokerInstance : _helixResourceManager.getAllInstancesForBrokerTenant(BROKER_TENANT_NAME)) {
-      _helixAdmin
-          .removeInstanceTag(_helixClusterName, brokerInstance, TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME));
+      _helixAdmin.removeInstanceTag(_helixClusterName, brokerInstance,
+          TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME));
       _helixAdmin.addInstanceTag(_helixClusterName, brokerInstance, CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
     }
 
@@ -228,8 +238,7 @@ public class PinotHelixResourceManagerTest extends ControllerTest {
   }
 
   @Test
-  public void testRetrieveMetadata()
-      throws Exception {
+  public void testRetrieveMetadata() throws Exception {
     String segmentName = "testSegment";
 
     // Test retrieving OFFLINE segment ZK metadata
@@ -263,7 +272,8 @@ public class PinotHelixResourceManagerTest extends ControllerTest {
     }
   }
 
-  @Test void testRetrieveTenantNames() {
+  @Test
+  void testRetrieveTenantNames() {
     // Create broker tenant on 1 Broker
     Tenant brokerTenant =
         new Tenant.TenantBuilder(BROKER_TENANT_NAME).setRole(TenantRole.BROKER).setTotalInstances(1).build();
@@ -366,8 +376,7 @@ public class PinotHelixResourceManagerTest extends ControllerTest {
     }
 
     // Create broker tenant on 3 Brokers
-    Tenant brokerTenant =
-        new Tenant.TenantBuilder(brokerTag).setRole(TenantRole.BROKER).setTotalInstances(3).build();
+    Tenant brokerTenant = new Tenant.TenantBuilder(brokerTag).setRole(TenantRole.BROKER).setTotalInstances(3).build();
     _helixResourceManager.createBrokerTenant(brokerTenant);
 
     // empty server instances list
@@ -452,18 +461,119 @@ public class PinotHelixResourceManagerTest extends ControllerTest {
     }
 
     for (String brokerInstance : _helixResourceManager.getAllInstancesForBrokerTenant(brokerTag)) {
-      _helixAdmin
-          .removeInstanceTag(_helixClusterName, brokerInstance, TagNameUtils.getBrokerTagForTenant(brokerTag));
+      _helixAdmin.removeInstanceTag(_helixClusterName, brokerInstance, TagNameUtils.getBrokerTagForTenant(brokerTag));
       _helixAdmin.addInstanceTag(_helixClusterName, brokerInstance, CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
     }
   }
 
+  @Test
+  public void testLeadControllerResource()
+      throws Exception {
+    IdealState leadControllerResourceIdealState = _helixResourceManager.getHelixAdmin()
+        .getResourceIdealState(getHelixClusterName(), CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+    Assert.assertTrue(leadControllerResourceIdealState.isValid());
+    Assert.assertTrue(leadControllerResourceIdealState.isEnabled());
+    Assert.assertEquals(leadControllerResourceIdealState.getInstanceGroupTag(),
+        CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE);
+    Assert.assertEquals(leadControllerResourceIdealState.getNumPartitions(),
+        CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE);
+    Assert.assertEquals(leadControllerResourceIdealState.getReplicas(),
+        Integer.toString(LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT));
+    Assert.assertEquals(leadControllerResourceIdealState.getRebalanceMode(), IdealState.RebalanceMode.FULL_AUTO);
+    Assert.assertTrue(leadControllerResourceIdealState.getInstanceSet(
+        leadControllerResourceIdealState.getPartitionSet().iterator().next()).isEmpty());
+
+    TestUtils
+        .waitForCondition(aVoid -> {
+              ExternalView leadControllerResourceExternalView = _helixResourceManager.getHelixAdmin()
+                  .getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+              for (String partition : leadControllerResourceExternalView.getPartitionSet()) {
+                Map<String, String> stateMap = leadControllerResourceExternalView.getStateMap(partition);
+                Map.Entry<String, String> entry = stateMap.entrySet().iterator().next();
+                boolean result = (PREFIX_OF_CONTROLLER_INSTANCE + LOCAL_HOST + "_" + _controllerPort).equals(entry.getKey());
+                result &= "MASTER".equals(entry.getValue());
+                if (!result) {
+                  return false;
+                }
+              }
+              return true;
+            },
+            TIMEOUT_IN_MS, "Failed to assign controller hosts to lead controller resource in " + TIMEOUT_IN_MS + " ms.");
+  }
+
+  @Test
+  public void testLeadControllerAssignment() {
+    // Given a number of instances (from 1 to 10), make sure all the instances got assigned to lead controller resource.
+    for (int nInstances = 1; nInstances <= MAXIMUM_NUMBER_OF_CONTROLLER_INSTANCES; nInstances++) {
+      List<String> instanceNames = new ArrayList<>(nInstances);
+      List<Integer> ports = new ArrayList<>(nInstances);
+      for (int i = 0; i < nInstances; i++) {
+        instanceNames.add(PREFIX_OF_CONTROLLER_INSTANCE + LOCAL_HOST + "_" + i);
+        ports.add(i);
+      }
+
+      List<String> partitions = new ArrayList<>(NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE);
+      for (int i = 0; i < NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE; i++) {
+        partitions.add(LEAD_CONTROLLER_RESOURCE_NAME + "_" + i);
+      }
+
+      LinkedHashMap<String, Integer> states = new LinkedHashMap<>(2);
+      states.put("OFFLINE", 0);
+      states.put("SLAVE", LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT - 1);
+      states.put("MASTER", 1);
+
+      CrushEdRebalanceStrategy crushEdRebalanceStrategy = new CrushEdRebalanceStrategy();
+      crushEdRebalanceStrategy.init(LEAD_CONTROLLER_RESOURCE_NAME, partitions, states, Integer.MAX_VALUE);
+
+      ClusterDataCache clusterDataCache = new ClusterDataCache();
+      PropertyKey.Builder keyBuilder = new PropertyKey.Builder(getHelixClusterName());
+      HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+      ClusterConfig clusterConfig = accessor.getProperty(keyBuilder.clusterConfig());
+      clusterDataCache.setClusterConfig(clusterConfig);
+
+      Map<String, InstanceConfig> instanceConfigMap = new HashMap<>(nInstances);
+      for (int i = 0; i < nInstances; i++) {
+        String instanceName = instanceNames.get(i);
+        int port = ports.get(i);
+        instanceConfigMap.put(instanceName, new InstanceConfig(instanceName
+            + ", {HELIX_ENABLED=true, HELIX_ENABLED_TIMESTAMP=1559546216610, HELIX_HOST=Controller_localhost, HELIX_PORT="
+            + port + "}{}{TAG_LIST=[controller]}"));
+      }
+      clusterDataCache.setInstanceConfigMap(instanceConfigMap);
+      ZNRecord znRecord =
+          crushEdRebalanceStrategy.computePartitionAssignment(instanceNames, instanceNames, new HashMap<>(0),
+              clusterDataCache);
+
+      Assert.assertNotNull(znRecord);
+      Map<String, List<String>> listFields = znRecord.getListFields();
+      Assert.assertEquals(listFields.size(), NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE);
+
+      Map<String, Integer> instanceToMasterAssignmentCountMap = new HashMap<>();
+      int maxCount = 0;
+      for (List<String> assignments : listFields.values()) {
+        Assert.assertEquals(assignments.size(), LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT);
+        if (!instanceToMasterAssignmentCountMap.containsKey(assignments.get(0))) {
+          instanceToMasterAssignmentCountMap.put(assignments.get(0), 1);
+        } else {
+          instanceToMasterAssignmentCountMap.put(assignments.get(0),
+              instanceToMasterAssignmentCountMap.get(assignments.get(0)) + 1);
+        }
+        maxCount = Math.max(instanceToMasterAssignmentCountMap.get(assignments.get(0)), maxCount);
+      }
+      Assert.assertEquals(instanceToMasterAssignmentCountMap.size(), nInstances,
+          "Not all the instances got assigned to the resource!");
+      for (Integer count : instanceToMasterAssignmentCountMap.values()) {
+        Assert.assertTrue((maxCount - count == 0 || maxCount - count == 1), "Instance assignment isn't distributed");
+      }
+    }
+  }
+
   @AfterMethod
   public void cleanUpBrokerTags() {
     // Untag all Brokers for other tests
     for (String brokerInstance : _helixResourceManager.getAllInstancesForBrokerTenant(BROKER_TENANT_NAME)) {
-      _helixAdmin
-          .removeInstanceTag(_helixClusterName, brokerInstance, TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME));
+      _helixAdmin.removeInstanceTag(_helixClusterName, brokerInstance,
+          TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME));
       _helixAdmin.addInstanceTag(_helixClusterName, brokerInstance, CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org