You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/04/01 21:51:08 UTC

[incubator-pinot] branch create-lead-controller-resource created (now 8682654)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a change to branch create-lead-controller-resource
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 8682654  Create leadControllerResource in Helix cluster

This branch includes the following new commits:

     new 8682654  Create leadControllerResource in Helix cluster

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Create leadControllerResource in Helix cluster

Posted by jl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch create-lead-controller-resource
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 868265437f90b4b649a554351d2458bd2c5e836f
Author: jackjlli <jl...@linkedin.com>
AuthorDate: Mon Apr 1 14:35:37 2019 -0700

    Create leadControllerResource in Helix cluster
---
 .../apache/pinot/common/utils/CommonConstants.java |  5 +++
 .../apache/pinot/controller/ControllerConf.java    | 17 +++++++-
 .../apache/pinot/controller/ControllerStarter.java | 34 ++++++++-------
 .../helix/core/PinotHelixResourceManager.java      | 27 +++++++++++-
 .../helix/core/util/HelixSetupUtils.java           | 49 ++++++++++++++++++++--
 5 files changed, 112 insertions(+), 20 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 137288c..b8dec72 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -35,8 +35,13 @@ public class CommonConstants {
 
     public static final String SERVER_INSTANCE_TYPE = "server";
     public static final String BROKER_INSTANCE_TYPE = "broker";
+    public static final String CONTROLLER_INSTANCE_TYPE = "controller";
 
     public static final String BROKER_RESOURCE_INSTANCE = "brokerResource";
+    public static final String LEAD_CONTROLLER_RESOURCE_INSTANCE = "leadControllerResource";
+    public static final int DEFAULT_NUMBER_OF_PARTICIPANTS_IN_LEAD_CONTROLLER_RESOURCE = 20;
+
+    public static final String MASTER_SLAVE_STATE_MODEL_DEFINITION = "MasterSlave";
 
     public static final String UNTAGGED_SERVER_INSTANCE = "server_untagged";
     public static final String UNTAGGED_BROKER_INSTANCE = "broker_untagged";
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 1faa949..07deca0 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -32,7 +32,6 @@ import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.StringUtil;
-import org.apache.pinot.controller.helix.core.util.HelixSetupUtils;
 import org.apache.pinot.filesystem.LocalPinotFS;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,6 +57,7 @@ public class ControllerConf extends PropertiesConfiguration {
   private static final String CONSOLE_WEBAPP_USE_HTTPS = "controller.query.console.useHttps";
   private static final String EXTERNAL_VIEW_ONLINE_TO_OFFLINE_TIMEOUT = "controller.upload.onlineToOfflineTimeout";
   private static final String CONTROLLER_MODE = "controller.mode";
+  private static final String NUMBER_OF_CONTROLLER_REPLICAS = "controller.number.replicas";
 
   public enum ControllerMode {
     DUAL,
@@ -134,6 +134,7 @@ public class ControllerConf extends PropertiesConfiguration {
   private static final String ENABLE_STORAGE_QUOTA_CHECK = "controller.enable.storage.quota.check";
 
   private static final String ENABLE_BATCH_MESSAGE_MODE = "controller.enable.batch.message.mode";
+  private static final String ENABLE_LEAD_CONTROLLER_RESOURCE = "controller.enable.lead.controller.resource";
 
   // Defines the kind of storage and the underlying PinotFS implementation
   private static final String PINOT_FS_FACTORY_CLASS_PREFIX = "controller.storage.factory.class";
@@ -151,7 +152,9 @@ public class ControllerConf extends PropertiesConfiguration {
   private static final int DEFAULT_REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = 64;
   private static final boolean DEFAULT_ENABLE_STORAGE_QUOTA_CHECK = true;
   private static final boolean DEFAULT_ENABLE_BATCH_MESSAGE_MODE = false;
+  private static final boolean DEFAULT_ENABLE_LEAD_CONTROLLER_RESOURCE = false;
   private static final String DEFAULT_CONTROLLER_MODE = ControllerMode.DUAL.name();
+  private static final int DEFAULT_NUMBER_OF_CONTROLLER_REPLICAS = 3;
 
   private static final String DEFAULT_PINOT_FS_FACTORY_CLASS_LOCAL = LocalPinotFS.class.getName();
 
@@ -578,6 +581,10 @@ public class ControllerConf extends PropertiesConfiguration {
     return getBoolean(ENABLE_BATCH_MESSAGE_MODE, DEFAULT_ENABLE_BATCH_MESSAGE_MODE);
   }
 
+  public boolean getEnableLeadControllerResource() {
+    return getBoolean(ENABLE_LEAD_CONTROLLER_RESOURCE, DEFAULT_ENABLE_LEAD_CONTROLLER_RESOURCE);
+  }
+
   public int getSegmentLevelValidationIntervalInSeconds() {
     return getInt(ControllerPeriodicTasksConf.SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS,
         ControllerPeriodicTasksConf.DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS);
@@ -613,4 +620,12 @@ public class ControllerConf extends PropertiesConfiguration {
   public ControllerMode getControllerMode() {
     return ControllerMode.valueOf(getString(CONTROLLER_MODE, DEFAULT_CONTROLLER_MODE).toUpperCase());
   }
+
+  public void setNumberOfControllerReplicas(int numReplicas) {
+    setProperty(NUMBER_OF_CONTROLLER_REPLICAS, numReplicas);
+  }
+
+  public int getNumberOfControllerReplicas() {
+    return getInt(NUMBER_OF_CONTROLLER_REPLICAS, DEFAULT_NUMBER_OF_CONTROLLER_REPLICAS);
+  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index 4051990..fefa790 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -92,6 +92,8 @@ public class ControllerStarter {
   private final String _instanceId;
   private final boolean _isUpdateStateModel;
   private final boolean _enableBatchMessageMode;
+  private final boolean _enableLeadControllerResource;
+  private final int _numberOfControllerReplicas;
   private final ControllerConf.ControllerMode _controllerMode;
 
   private HelixManager _helixControllerManager;
@@ -118,6 +120,8 @@ public class ControllerStarter {
     _instanceId = conf.getControllerHost() + "_" + conf.getControllerPort();
     _isUpdateStateModel = _config.isUpdateSegmentStateModel();
     _enableBatchMessageMode = _config.getEnableBatchMessageMode();
+    _enableLeadControllerResource = _config.getEnableLeadControllerResource();
+    _numberOfControllerReplicas = _config.getNumberOfControllerReplicas();
 
     _metricsRegistry = new MetricsRegistry();
     _controllerMetrics = new ControllerMetrics(_metricsRegistry);
@@ -190,7 +194,8 @@ public class ControllerStarter {
         LOGGER.error("Invalid mode: " + _controllerMode);
     }
 
-    ServiceStatus.setServiceStatusCallback(new ServiceStatus.MultipleCallbackServiceStatusCallback(_serviceStatusCallbackList));
+    ServiceStatus
+        .setServiceStatusCallback(new ServiceStatus.MultipleCallbackServiceStatusCallback(_serviceStatusCallbackList));
     _controllerMetrics.initializeGlobalMeters();
   }
 
@@ -198,7 +203,8 @@ public class ControllerStarter {
     // Register and connect instance as Helix controller.
     LOGGER.info("Starting Helix controller");
     _helixControllerManager = HelixSetupUtils
-        .setup(_helixClusterName, _helixZkURL, _instanceId, _isUpdateStateModel, _enableBatchMessageMode);
+        .setup(_helixClusterName, _helixZkURL, _instanceId, _isUpdateStateModel, _enableBatchMessageMode,
+            _enableLeadControllerResource, _numberOfControllerReplicas);
 
     // Emit helix controller metrics
     _controllerMetrics.addCallbackGauge("helix.connected", () -> _helixControllerManager.isConnected() ? 1L : 0L);
@@ -412,18 +418,18 @@ public class ControllerStarter {
   }
 
   public void stop() {
-      switch (_controllerMode) {
-        case DUAL:
-          stopPinotController();
-          stopHelixController();
-          break;
-        case PINOT_ONLY:
-          stopPinotController();
-          break;
-        case HELIX_ONLY:
-          stopHelixController();
-          break;
-      }
+    switch (_controllerMode) {
+      case DUAL:
+        stopPinotController();
+        stopHelixController();
+        break;
+      case PINOT_ONLY:
+        stopPinotController();
+        break;
+      case HELIX_ONLY:
+        stopHelixController();
+        break;
+    }
   }
 
   private void stopHelixController() {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 04b6f9b..8ae2a9f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -52,6 +52,7 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
 import org.apache.helix.model.CurrentState;
@@ -59,6 +60,7 @@ import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
+import org.apache.helix.participant.StateMachineEngine;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.config.IndexingConfig;
 import org.apache.pinot.common.config.OfflineTagConfig;
@@ -125,6 +127,7 @@ public class PinotHelixResourceManager {
   private final String _helixZkURL;
   private final String _helixClusterName;
   private final String _instanceId;
+  private final String _controllerParticipantInstanceId;
   private final String _dataDir;
   private final long _externalViewOnlineToOfflineTimeoutMillis;
   private final boolean _isSingleTenantCluster;
@@ -145,6 +148,7 @@ public class PinotHelixResourceManager {
     _helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL);
     _helixClusterName = helixClusterName;
     _instanceId = controllerInstanceId;
+    _controllerParticipantInstanceId = CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + _instanceId;
     _dataDir = dataDir;
     _externalViewOnlineToOfflineTimeoutMillis = externalViewOnlineToOfflineTimeoutMillis;
     _isSingleTenantCluster = isSingleTenantCluster;
@@ -179,6 +183,10 @@ public class PinotHelixResourceManager {
     _cacheInstanceConfigsDataAccessor =
         new ZkCacheBaseDataAccessor<>((ZkBaseDataAccessor<ZNRecord>) baseDataAccessor, instanceConfigs, null,
             Collections.singletonList(instanceConfigs));
+
+    // Add instance group tag for controller
+    addInstanceGroupTag();
+
     _keyBuilder = _helixDataAccessor.keyBuilder();
     _segmentDeletionManager = new SegmentDeletionManager(_dataDir, _helixAdmin, _helixClusterName, _propertyStore);
     ZKMetadataProvider.setClusterTenantIsolationEnabled(_propertyStore, _isSingleTenantCluster);
@@ -254,20 +262,35 @@ public class PinotHelixResourceManager {
    */
   private HelixManager registerAndConnectAsHelixParticipant() {
     HelixManager helixManager = HelixManagerFactory
-        .getZKHelixManager(_helixClusterName, CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + _instanceId,
+        .getZKHelixManager(_helixClusterName, _controllerParticipantInstanceId,
             InstanceType.PARTICIPANT, _helixZkURL);
+
+    StateMachineEngine stateMach = helixManager.getStateMachineEngine();
+    MasterSlaveStateModelFactory factory = new MasterSlaveStateModelFactory();
+    stateMach.registerStateModelFactory(CommonConstants.Helix.MASTER_SLAVE_STATE_MODEL_DEFINITION, factory);
     try {
       helixManager.connect();
       return helixManager;
     } catch (Exception e) {
       String errorMsg =
-          String.format("Exception when connecting the instance %s as Participant to Helix.", _instanceId);
+          String.format("Exception when connecting the instance %s as Participant to Helix.", _controllerParticipantInstanceId);
       LOGGER.error(errorMsg, e);
       throw new RuntimeException(errorMsg);
     }
   }
 
   /**
+   * Add instance group tag for controller so that pinot controller can be assigned to lead controller resource.
+   */
+  private void addInstanceGroupTag() {
+    _helixZkManager.getClusterManagmentTool().enableInstance(_helixClusterName, _controllerParticipantInstanceId, true);
+    InstanceConfig instanceConfig = getHelixInstanceConfig(_controllerParticipantInstanceId);
+    instanceConfig.addTag(CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE);
+    HelixDataAccessor accessor = _helixZkManager.getHelixDataAccessor();
+    accessor.setProperty(accessor.keyBuilder().instanceConfig(_controllerParticipantInstanceId), instanceConfig);
+  }
+
+  /**
    * Instance related APIs
    */
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
index b8707f7..81c20b3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
@@ -41,6 +41,7 @@ import org.apache.helix.messaging.handling.HelixTaskExecutor;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.MasterSlaveSMD;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
@@ -64,10 +65,12 @@ public class HelixSetupUtils {
   private static final Logger LOGGER = LoggerFactory.getLogger(HelixSetupUtils.class);
 
   public static synchronized HelixManager setup(String helixClusterName, String zkPath,
-      String pinotControllerInstanceId, boolean isUpdateStateModel, boolean enableBatchMessageMode) {
+      String pinotControllerInstanceId, boolean isUpdateStateModel, boolean enableBatchMessageMode,
+      boolean enableLeadControllerResource, int numberOfControllerReplicas) {
 
     try {
-      createHelixClusterIfNeeded(helixClusterName, zkPath, isUpdateStateModel, enableBatchMessageMode);
+      createHelixClusterIfNeeded(helixClusterName, zkPath, isUpdateStateModel, enableBatchMessageMode,
+          enableLeadControllerResource, numberOfControllerReplicas);
     } catch (final Exception e) {
       LOGGER.error("Caught exception", e);
       return null;
@@ -82,7 +85,7 @@ public class HelixSetupUtils {
   }
 
   public static void createHelixClusterIfNeeded(String helixClusterName, String zkPath, boolean isUpdateStateModel,
-      boolean enableBatchMessageMode) {
+      boolean enableBatchMessageMode, boolean enableLeadControllerResource, int numberOfControllerReplicas) {
     final HelixAdmin admin = new ZKHelixAdmin(zkPath);
     final String segmentStateModelName =
         PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
@@ -110,6 +113,8 @@ public class HelixSetupUtils {
           zkClient.close();
         }
       }
+      createLeadControllerResourceIfNeeded(helixClusterName, admin, enableBatchMessageMode,
+          enableLeadControllerResource, numberOfControllerReplicas);
       return;
     }
 
@@ -154,6 +159,9 @@ public class HelixSetupUtils {
         .buildEmptyIdealStateForBrokerResource(admin, helixClusterName, enableBatchMessageMode);
     admin.setResourceIdealState(helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, idealState);
     initPropertyStorePath(helixClusterName, zkPath);
+
+    createLeadControllerResourceIfNeeded(helixClusterName, admin, enableBatchMessageMode, enableLeadControllerResource,
+        numberOfControllerReplicas);
     LOGGER.info("New Cluster setup completed... ********************************************** ");
   }
 
@@ -175,4 +183,39 @@ public class HelixSetupUtils {
     return HelixControllerMain
         .startHelixController(zkUrl, helixClusterName, pinotControllerInstanceId, HelixControllerMain.STANDALONE);
   }
+
+  private static void createLeadControllerResourceIfNeeded(String helixClusterName, HelixAdmin admin,
+      boolean enableBatchMessageMode, boolean enableLeadControllerResource, int numberOfControllerReplicas) {
+    List<String> resources = admin.getResourcesInCluster(helixClusterName);
+    if (!resources.contains(CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_INSTANCE)) {
+      LOGGER.info("Cluster {} doesn't contain {}. Creating one..", helixClusterName,
+          CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_INSTANCE);
+
+      admin.addStateModelDef(helixClusterName, CommonConstants.Helix.MASTER_SLAVE_STATE_MODEL_DEFINITION,
+          MasterSlaveSMD.build());
+
+      HelixHelper.updateResourceConfigsFor(new HashMap<String, String>(),
+          CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_INSTANCE, helixClusterName, admin);
+      admin.addResource(helixClusterName, CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_INSTANCE,
+          CommonConstants.Helix.DEFAULT_NUMBER_OF_PARTICIPANTS_IN_LEAD_CONTROLLER_RESOURCE,
+          CommonConstants.Helix.MASTER_SLAVE_STATE_MODEL_DEFINITION, IdealState.RebalanceMode.FULL_AUTO.toString());
+
+      // set instance group tag for controller instance
+      IdealState leadControllerIdealState =
+          admin.getResourceIdealState(helixClusterName, CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_INSTANCE);
+      leadControllerIdealState.setInstanceGroupTag(CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE);
+      leadControllerIdealState.setBatchMessageMode(enableBatchMessageMode);
+      admin.setResourceIdealState(helixClusterName, CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_INSTANCE,
+          leadControllerIdealState);
+    }
+
+    LOGGER.info("Re-balance lead controller resource with replicas: {}", numberOfControllerReplicas);
+    admin.rebalance(helixClusterName, CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_INSTANCE,
+        numberOfControllerReplicas);
+
+    // Enable or disable lead controller resource.
+    LOGGER.info("{} lead controller resource.", enableLeadControllerResource ? "Enabling" : "Disabling");
+    admin.enableResource(helixClusterName, CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_INSTANCE,
+        enableLeadControllerResource);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org