You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/06/07 05:18:22 UTC
[incubator-pinot] branch master updated: Create
leadControllerResource in helix cluster (#4047)
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7566790 Create leadControllerResource in helix cluster (#4047)
7566790 is described below
commit 75667900a7ad4739069aa2220b778ab7d7b12a6b
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Thu Jun 6 22:18:16 2019 -0700
Create leadControllerResource in helix cluster (#4047)
* Create leadControllerResource in Helix cluster
* Separate Helix cluster creation logic
* Enabled delay rebalance on lead controller resource, set delay time to 5 mins, minActiveReplicas to 0, numReplicas to 1.
---
.../apache/pinot/common/config/TagNameUtils.java | 30 +--
.../apache/pinot/common/utils/CommonConstants.java | 10 +
.../apache/pinot/controller/ControllerStarter.java | 6 +-
.../helix/core/PinotHelixResourceManager.java | 76 +++---
.../helix/core/util/HelixSetupUtils.java | 286 ++++++++++++++-------
.../controller/helix/PinotControllerModeTest.java | 113 +++++++-
.../helix/core/PinotHelixResourceManagerTest.java | 180 ++++++++++---
7 files changed, 498 insertions(+), 203 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/config/TagNameUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/config/TagNameUtils.java
index f1ab25e..a22e4fa 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/config/TagNameUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/config/TagNameUtils.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.common.config;
-import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.utils.ServerType;
import org.apache.pinot.common.utils.TenantRole;
@@ -38,25 +37,20 @@ public class TagNameUtils {
return tenantName + "_" + TenantRole.BROKER.toString();
}
- public static boolean hasValidServerTagSuffix(String tagName) {
- if (tagName.endsWith(ServerType.REALTIME.toString()) || tagName.endsWith(ServerType.OFFLINE.toString())) {
- return true;
- }
- return false;
+ public static boolean isServerTag(String tagName) {
+ return isOfflineServerTag(tagName) || isRealtimeServerTag(tagName);
}
- public static TenantRole getTenantRoleFromTag(String tagName)
- throws InvalidConfigException {
- if (tagName.endsWith(ServerType.REALTIME.toString())) {
- return TenantRole.SERVER;
- }
- if (tagName.endsWith(ServerType.OFFLINE.toString())) {
- return TenantRole.SERVER;
- }
- if (tagName.endsWith(TenantRole.BROKER.toString())) {
- return TenantRole.BROKER;
- }
- throw new InvalidConfigException("Cannot identify tenant type from tag name : " + tagName);
+ public static boolean isOfflineServerTag(String tagName) {
+ return tagName.endsWith(ServerType.OFFLINE.toString());
+ }
+
+ public static boolean isRealtimeServerTag(String tagName) {
+ return tagName.endsWith(ServerType.REALTIME.toString());
+ }
+
+ public static boolean isBrokerTag(String tagName) {
+ return tagName.endsWith(TenantRole.BROKER.toString());
}
public static String getTagFromTenantAndServerType(String tenantName, ServerType type) {
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 915e96e..8132cdb 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -37,8 +37,18 @@ public class CommonConstants {
public static final String SERVER_INSTANCE_TYPE = "server";
public static final String BROKER_INSTANCE_TYPE = "broker";
+ public static final String CONTROLLER_INSTANCE_TYPE = "controller";
public static final String BROKER_RESOURCE_INSTANCE = "brokerResource";
+ public static final String LEAD_CONTROLLER_RESOURCE_NAME = "leadControllerResource";
+
+ // More information on why these numbers are set can be found in the following doc:
+ // https://cwiki.apache.org/confluence/display/PINOT/Controller+Separation+between+Helix+and+Pinot
+ public static final int NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE = 24;
+ public static final int LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT = 1;
+ public static final boolean ENABLE_DELAY_REBALANCE = true;
+ public static final int MIN_ACTIVE_REPLICAS = 0;
+ public static final long REBALANCE_DELAY_MS = 300_000L; // 5 minutes.
public static final String UNTAGGED_SERVER_INSTANCE = "server_untagged";
public static final String UNTAGGED_BROKER_INSTANCE = "broker_untagged";
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index f57d21f..c313616 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -219,8 +219,7 @@ public class ControllerStarter {
private void setUpHelixController() {
// Register and connect instance as Helix controller.
LOGGER.info("Starting Helix controller");
- _helixControllerManager = HelixSetupUtils
- .setup(_helixClusterName, _helixZkURL, _instanceId, _isUpdateStateModel, _enableBatchMessageMode);
+ _helixControllerManager = HelixSetupUtils.setup(_helixClusterName, _helixZkURL, _instanceId);
// Emit helix controller metrics
_controllerMetrics.addCallbackGauge(CommonConstants.Helix.INSTANCE_CONNECTED_METRIC_NAME,
@@ -244,6 +243,9 @@ public class ControllerStarter {
throw new RuntimeException("Pinot only controller currently isn't supported in production yet.");
}
+ // Set up Pinot cluster in Helix
+ HelixSetupUtils.setupPinotCluster(_helixClusterName, _helixZkURL, _isUpdateStateModel, _enableBatchMessageMode);
+
// Start all components
initPinotFSFactory();
initSegmentFetcherFactory();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index aaa9ea8..877458d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -53,6 +53,7 @@ import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.ZNRecord;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
import org.apache.helix.model.CurrentState;
@@ -60,6 +61,7 @@ import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.MasterSlaveSMD;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.config.IndexingConfig;
import org.apache.pinot.common.config.OfflineTagConfig;
@@ -92,7 +94,6 @@ import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.BrokerOnli
import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel;
import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
import org.apache.pinot.common.utils.SchemaUtils;
-import org.apache.pinot.common.utils.TenantRole;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.common.utils.helix.PinotHelixPropertyStoreZnRecordProvider;
import org.apache.pinot.common.utils.retry.RetryPolicies;
@@ -157,15 +158,10 @@ public class PinotHelixResourceManager {
_allowHLCTables = allowHLCTables;
}
- public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String helixClusterName,
- @Nonnull String controllerInstanceId, @Nonnull String dataDir) {
- this(zkURL, helixClusterName, controllerInstanceId, dataDir, DEFAULT_EXTERNAL_VIEW_UPDATE_TIMEOUT_MILLIS, false,
- true, true);
- }
-
public PinotHelixResourceManager(@Nonnull ControllerConf controllerConf) {
this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(),
- controllerConf.getControllerHost() + "_" + controllerConf.getControllerPort(), controllerConf.getDataDir(),
+ CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + controllerConf.getControllerHost() + "_"
+ + controllerConf.getControllerPort(), controllerConf.getDataDir(),
controllerConf.getExternalViewOnlineToOfflineTimeout(), controllerConf.tenantIsolationEnabled(),
controllerConf.getEnableBatchMessageMode(), controllerConf.getHLCTablesAllowed());
}
@@ -185,6 +181,10 @@ public class PinotHelixResourceManager {
_cacheInstanceConfigsDataAccessor =
new ZkCacheBaseDataAccessor<>((ZkBaseDataAccessor<ZNRecord>) baseDataAccessor, instanceConfigs, null,
Collections.singletonList(instanceConfigs));
+
+ // Add instance group tag for controller
+ addInstanceGroupTagIfNeeded();
+
_keyBuilder = _helixDataAccessor.keyBuilder();
_segmentDeletionManager = new SegmentDeletionManager(_dataDir, _helixAdmin, _helixClusterName, _propertyStore);
ZKMetadataProvider.setClusterTenantIsolationEnabled(_propertyStore, _isSingleTenantCluster);
@@ -259,9 +259,13 @@ public class PinotHelixResourceManager {
* Register and connect to Helix cluster as PARTICIPANT role.
*/
private HelixManager registerAndConnectAsHelixParticipant() {
- HelixManager helixManager = HelixManagerFactory
- .getZKHelixManager(_helixClusterName, CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + _instanceId,
- InstanceType.PARTICIPANT, _helixZkURL);
+ HelixManager helixManager =
+ HelixManagerFactory.getZKHelixManager(_helixClusterName, _instanceId, InstanceType.PARTICIPANT, _helixZkURL);
+
+ // Registers Master-Slave state model to state machine engine, which is for calculating participant assignment in lead controller resource.
+ helixManager.getStateMachineEngine()
+ .registerStateModelFactory(MasterSlaveSMD.name, new MasterSlaveStateModelFactory());
+
try {
helixManager.connect();
return helixManager;
@@ -274,6 +278,20 @@ public class PinotHelixResourceManager {
}
/**
+ * Add instance group tag for controller so that pinot controller can be assigned to lead controller resource.
+ */
+ private void addInstanceGroupTagIfNeeded() {
+ InstanceConfig instanceConfig = getHelixInstanceConfig(_instanceId);
+ if (!instanceConfig.containsTag(CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE)) {
+ LOGGER.info("Controller: {} doesn't contain group tag: {}. Adding one.", _instanceId,
+ CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE);
+ instanceConfig.addTag(CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE);
+ HelixDataAccessor accessor = _helixZkManager.getHelixDataAccessor();
+ accessor.setProperty(accessor.keyBuilder().instanceConfig(_instanceId), instanceConfig);
+ }
+ }
+
+ /**
* Instance related APIs
*/
@@ -835,20 +853,8 @@ public class PinotHelixResourceManager {
for (String instanceName : instancesInCluster) {
InstanceConfig config = _helixDataAccessor.getProperty(_keyBuilder.instanceConfig(instanceName));
for (String tag : config.getTags()) {
- if (tag.equals(CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE) || tag
- .equals(CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE) || tag
- .equals(CommonConstants.Minion.UNTAGGED_INSTANCE)) {
- continue;
- }
- TenantRole tenantRole;
- try {
- tenantRole = TagNameUtils.getTenantRoleFromTag(tag);
- if (tenantRole == TenantRole.BROKER) {
- tenantSet.add(TagNameUtils.getTenantNameFromTag(tag));
- }
- } catch (InvalidConfigException e) {
- LOGGER.warn("Instance {} contains an invalid tag: {}", instanceName, tag);
- continue;
+ if (TagNameUtils.isBrokerTag(tag)) {
+ tenantSet.add(TagNameUtils.getTenantNameFromTag(tag));
}
}
}
@@ -861,20 +867,8 @@ public class PinotHelixResourceManager {
for (String instanceName : instancesInCluster) {
InstanceConfig config = _helixDataAccessor.getProperty(_keyBuilder.instanceConfig(instanceName));
for (String tag : config.getTags()) {
- if (tag.equals(CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE) || tag
- .equals(CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE) || tag
- .equals(CommonConstants.Minion.UNTAGGED_INSTANCE)) {
- continue;
- }
- TenantRole tenantRole;
- try {
- tenantRole = TagNameUtils.getTenantRoleFromTag(tag);
- if (tenantRole == TenantRole.SERVER) {
- tenantSet.add(TagNameUtils.getTenantNameFromTag(tag));
- }
- } catch (InvalidConfigException e) {
- LOGGER.warn("Instance {} contains an invalid tag: {}", instanceName, tag);
- continue;
+ if (TagNameUtils.isServerTag(tag)) {
+ tenantSet.add(TagNameUtils.getTenantNameFromTag(tag));
}
}
}
@@ -1176,7 +1170,7 @@ public class PinotHelixResourceManager {
if (tagOverrideConfig != null) {
String realtimeConsumingTag = tagOverrideConfig.getRealtimeConsuming();
if (realtimeConsumingTag != null) {
- if (!TagNameUtils.hasValidServerTagSuffix(realtimeConsumingTag)) {
+ if (!TagNameUtils.isServerTag(realtimeConsumingTag)) {
throw new InvalidTableConfigException(
"Invalid realtime consuming tag: " + realtimeConsumingTag + " for table " + tableNameWithType
+ ". Must have suffix _REALTIME or _OFFLINE");
@@ -1190,7 +1184,7 @@ public class PinotHelixResourceManager {
String realtimeCompletedTag = tagOverrideConfig.getRealtimeCompleted();
if (realtimeCompletedTag != null) {
- if (!TagNameUtils.hasValidServerTagSuffix(realtimeCompletedTag)) {
+ if (!TagNameUtils.isServerTag(realtimeCompletedTag)) {
throw new InvalidTableConfigException(
"Invalid realtime completed tag: " + realtimeCompletedTag + " for table " + tableNameWithType
+ ". Must have suffix _REALTIME or _OFFLINE");
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
index b8707f7..f65a8ce 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
@@ -18,6 +18,8 @@
*/
package org.apache.pinot.controller.helix.core.util;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -27,10 +29,10 @@ import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyPathConfig;
-import org.apache.helix.PropertyType;
+import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZKHelixManager;
@@ -41,6 +43,7 @@ import org.apache.helix.messaging.handling.HelixTaskExecutor;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.MasterSlaveSMD;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
@@ -53,6 +56,11 @@ import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.ENABLE_DELAY_REBALANCE;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.MIN_ACTIVE_REPLICAS;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.REBALANCE_DELAY_MS;
+
/**
* HelixSetupUtils handles how to create or get a helixCluster in controller.
@@ -60,119 +68,205 @@ import org.slf4j.LoggerFactory;
*
*/
public class HelixSetupUtils {
-
private static final Logger LOGGER = LoggerFactory.getLogger(HelixSetupUtils.class);
public static synchronized HelixManager setup(String helixClusterName, String zkPath,
- String pinotControllerInstanceId, boolean isUpdateStateModel, boolean enableBatchMessageMode) {
+ String helixControllerInstanceId) {
+ setupHelixCluster(helixClusterName, zkPath);
- try {
- createHelixClusterIfNeeded(helixClusterName, zkPath, isUpdateStateModel, enableBatchMessageMode);
- } catch (final Exception e) {
- LOGGER.error("Caught exception", e);
- return null;
- }
+ return startHelixControllerInStandadloneMode(helixClusterName, zkPath, helixControllerInstanceId);
+ }
- try {
- return startHelixControllerInStandadloneMode(helixClusterName, zkPath, pinotControllerInstanceId);
- } catch (final Exception e) {
- LOGGER.error("Caught exception", e);
- return null;
+ /**
+ * Set up a brand new Helix cluster if it doesn't exist.
+ */
+ public static void setupHelixCluster(String helixClusterName, String zkPath) {
+ final HelixAdmin admin = new ZKHelixAdmin(zkPath);
+ if (admin.getClusters().contains(helixClusterName)) {
+ LOGGER.info("Helix cluster: {} already exists", helixClusterName);
+ return;
}
+ LOGGER.info("Creating a new Helix cluster: {}", helixClusterName);
+ admin.addCluster(helixClusterName, false);
+ LOGGER.info("New Cluster: {} created.", helixClusterName);
+ }
+
+ private static HelixManager startHelixControllerInStandadloneMode(String helixClusterName, String zkUrl,
+ String pinotControllerInstanceId) {
+ LOGGER.info("Starting Helix Standalone Controller ... ");
+ return HelixControllerMain
+ .startHelixController(zkUrl, helixClusterName, pinotControllerInstanceId, HelixControllerMain.STANDALONE);
}
- public static void createHelixClusterIfNeeded(String helixClusterName, String zkPath, boolean isUpdateStateModel,
+ /**
+ * Customizes existing Helix cluster to run Pinot components.
+ */
+ public static void setupPinotCluster(String helixClusterName, String zkPath, boolean isUpdateStateModel,
boolean enableBatchMessageMode) {
final HelixAdmin admin = new ZKHelixAdmin(zkPath);
- final String segmentStateModelName =
- PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+ Preconditions.checkState(admin.getClusters().contains(helixClusterName),
+ String.format("Helix cluster: %s hasn't been set up", helixClusterName));
- if (admin.getClusters().contains(helixClusterName)) {
- LOGGER.info("cluster already exists ********************************************* ");
- if (isUpdateStateModel) {
- final StateModelDefinition curStateModelDef = admin.getStateModelDef(helixClusterName, segmentStateModelName);
- List<String> states = curStateModelDef.getStatesPriorityList();
- if (states.contains(PinotHelixSegmentOnlineOfflineStateModelGenerator.CONSUMING_STATE)) {
- LOGGER.info("State model {} already updated to contain CONSUMING state", segmentStateModelName);
- return;
- } else {
- LOGGER.info("Updating {} to add states for low level consumers", segmentStateModelName);
- StateModelDefinition newStateModelDef =
- PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition();
- ZkClient zkClient = new ZkClient(zkPath);
- zkClient.waitUntilConnected(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC, TimeUnit.SECONDS);
- zkClient.setZkSerializer(new ZNRecordSerializer());
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(helixClusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
- PropertyKey.Builder keyBuilder = accessor.keyBuilder();
- accessor.setProperty(keyBuilder.stateModelDef(segmentStateModelName), newStateModelDef);
- LOGGER.info("Completed updating statemodel {}", segmentStateModelName);
- zkClient.close();
- }
- }
- return;
- }
+ // Ensure auto join.
+ ensureAutoJoin(helixClusterName, admin);
- LOGGER.info("Creating a new cluster, as the helix cluster : " + helixClusterName
- + " was not found ********************************************* ");
- admin.addCluster(helixClusterName, false);
+ // Add segment state model definition if needed
+ addSegmentStateModelDefinitionIfNeeded(helixClusterName, admin, zkPath, isUpdateStateModel);
+
+ // Add broker resource if needed
+ createBrokerResourceIfNeeded(helixClusterName, admin, enableBatchMessageMode);
- LOGGER.info("Enable auto join.");
+ // Add lead controller resource if needed
+ createLeadControllerResourceIfNeeded(helixClusterName, admin, enableBatchMessageMode);
+
+ // Init property store if needed
+ initPropertyStoreIfNeeded(helixClusterName, zkPath);
+ }
+
+ private static void ensureAutoJoin(String helixClusterName, HelixAdmin admin) {
final HelixConfigScope scope =
new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(helixClusterName).build();
+ String stateTransitionMaxThreads = MessageType.STATE_TRANSITION + "." + HelixTaskExecutor.MAX_THREADS;
+ List<String> keys = new ArrayList<>();
+ keys.add(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN);
+ keys.add(stateTransitionMaxThreads);
+ Map<String, String> configs = admin.getConfig(scope, keys);
+ if (!Boolean.TRUE.toString().equals(configs.get(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN))) {
+ configs.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, Boolean.TRUE.toString());
+ }
+ if (!Integer.toString(1).equals(configs.get(stateTransitionMaxThreads))) {
+ configs.put(stateTransitionMaxThreads, String.valueOf(1));
+ }
+ admin.setConfig(scope, configs);
+ }
- final Map<String, String> props = new HashMap<String, String>();
- props.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, String.valueOf(true));
- //we need only one segment to be loaded at a time
- props.put(MessageType.STATE_TRANSITION + "." + HelixTaskExecutor.MAX_THREADS, String.valueOf(1));
-
- admin.setConfig(scope, props);
-
- LOGGER.info(
- "Adding state model {} (with CONSUMED state) generated using {} **********************************************",
- segmentStateModelName, PinotHelixSegmentOnlineOfflineStateModelGenerator.class.toString());
-
- // If this is a fresh cluster we are creating, then the cluster will see the CONSUMING state in the
- // state model. But then the servers will never be asked to go to that STATE (whether they have the code
- // to handle it or not) unil we complete the feature using low-level consumers and turn the feature on.
- admin.addStateModelDef(helixClusterName, segmentStateModelName,
- PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
-
- LOGGER.info("Adding state model definition named : "
- + PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL
- + " generated using : " + PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.class.toString()
- + " ********************************************** ");
-
- admin.addStateModelDef(helixClusterName,
- PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL,
- PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
-
- LOGGER.info("Adding empty ideal state for Broker!");
- HelixHelper.updateResourceConfigsFor(new HashMap<String, String>(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE,
- helixClusterName, admin);
- IdealState idealState = PinotTableIdealStateBuilder
- .buildEmptyIdealStateForBrokerResource(admin, helixClusterName, enableBatchMessageMode);
- admin.setResourceIdealState(helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, idealState);
- initPropertyStorePath(helixClusterName, zkPath);
- LOGGER.info("New Cluster setup completed... ********************************************** ");
+ private static void addSegmentStateModelDefinitionIfNeeded(String helixClusterName, HelixAdmin admin, String zkPath,
+ boolean isUpdateStateModel) {
+ final String segmentStateModelName =
+ PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+ StateModelDefinition stateModelDefinition = admin.getStateModelDef(helixClusterName, segmentStateModelName);
+ if (stateModelDefinition == null) {
+ LOGGER.info("Adding state model {} (with CONSUMED state) generated using {}", segmentStateModelName,
+ PinotHelixSegmentOnlineOfflineStateModelGenerator.class.toString());
+ admin.addStateModelDef(helixClusterName, segmentStateModelName,
+ PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
+ } else if (isUpdateStateModel) {
+ final StateModelDefinition curStateModelDef = admin.getStateModelDef(helixClusterName, segmentStateModelName);
+ List<String> states = curStateModelDef.getStatesPriorityList();
+ if (states.contains(PinotHelixSegmentOnlineOfflineStateModelGenerator.CONSUMING_STATE)) {
+ LOGGER.info("State model {} already updated to contain CONSUMING state", segmentStateModelName);
+ } else {
+ LOGGER.info("Updating {} to add states for low level consumers", segmentStateModelName);
+ StateModelDefinition newStateModelDef =
+ PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition();
+ ZkClient zkClient = new ZkClient(zkPath);
+ zkClient.waitUntilConnected(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC, TimeUnit.SECONDS);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(helixClusterName, new ZkBaseDataAccessor<>(zkClient));
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ accessor.setProperty(keyBuilder.stateModelDef(segmentStateModelName), newStateModelDef);
+ LOGGER.info("Completed updating state model {}", segmentStateModelName);
+ zkClient.close();
+ }
+ }
}
- private static void initPropertyStorePath(String helixClusterName, String zkPath) {
- String propertyStorePath = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, helixClusterName);
- ZkHelixPropertyStore<ZNRecord> propertyStore =
- new ZkHelixPropertyStore<ZNRecord>(zkPath, new ZNRecordSerializer(), propertyStorePath);
- propertyStore.create("/CONFIGS", new ZNRecord(""), AccessOption.PERSISTENT);
- propertyStore.create("/CONFIGS/CLUSTER", new ZNRecord(""), AccessOption.PERSISTENT);
- propertyStore.create("/CONFIGS/TABLE", new ZNRecord(""), AccessOption.PERSISTENT);
- propertyStore.create("/CONFIGS/INSTANCE", new ZNRecord(""), AccessOption.PERSISTENT);
- propertyStore.create("/SCHEMAS", new ZNRecord(""), AccessOption.PERSISTENT);
- propertyStore.create("/SEGMENTS", new ZNRecord(""), AccessOption.PERSISTENT);
+ private static void createBrokerResourceIfNeeded(String helixClusterName, HelixAdmin admin,
+ boolean enableBatchMessageMode) {
+ // Add broker resource online offline state model definition if needed
+ StateModelDefinition brokerResourceStateModelDefinition = admin.getStateModelDef(helixClusterName,
+ PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL);
+ if (brokerResourceStateModelDefinition == null) {
+ LOGGER.info("Adding state model definition named : {} generated using : {}",
+ PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL,
+ PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.class.toString());
+ admin.addStateModelDef(helixClusterName,
+ PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL,
+ PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
+ }
+
+ // Create broker resource if needed.
+ IdealState brokerResourceIdealState =
+ admin.getResourceIdealState(helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+ if (brokerResourceIdealState == null) {
+ LOGGER.info("Adding empty ideal state for Broker!");
+ HelixHelper
+ .updateResourceConfigsFor(new HashMap<>(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, helixClusterName,
+ admin);
+ IdealState idealState = PinotTableIdealStateBuilder
+ .buildEmptyIdealStateForBrokerResource(admin, helixClusterName, enableBatchMessageMode);
+ admin.setResourceIdealState(helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, idealState);
+ }
}
- private static HelixManager startHelixControllerInStandadloneMode(String helixClusterName, String zkUrl,
- String pinotControllerInstanceId) {
- LOGGER.info("Starting Helix Standalone Controller ... ");
- return HelixControllerMain
- .startHelixController(zkUrl, helixClusterName, pinotControllerInstanceId, HelixControllerMain.STANDALONE);
+ private static void createLeadControllerResourceIfNeeded(String helixClusterName, HelixAdmin admin,
+ boolean enableBatchMessageMode) {
+ StateModelDefinition masterSlaveStateModelDefinition =
+ admin.getStateModelDef(helixClusterName, MasterSlaveSMD.name);
+ if (masterSlaveStateModelDefinition == null) {
+ LOGGER.info("Adding state model definition named : {} generated using : {}", MasterSlaveSMD.name,
+ MasterSlaveSMD.class.toString());
+ admin.addStateModelDef(helixClusterName, MasterSlaveSMD.name, MasterSlaveSMD.build());
+ }
+
+ IdealState leadControllerResourceIdealState =
+ admin.getResourceIdealState(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME);
+ if (leadControllerResourceIdealState == null) {
+ LOGGER.info("Cluster {} doesn't contain {}. Creating one.", helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME);
+ HelixHelper.updateResourceConfigsFor(new HashMap<>(), LEAD_CONTROLLER_RESOURCE_NAME, helixClusterName, admin);
+ // FULL-AUTO Master-Slave state model with CrushED reBalance strategy.
+ admin.addResource(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME,
+ CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE, MasterSlaveSMD.name,
+ IdealState.RebalanceMode.FULL_AUTO.toString(), CrushEdRebalanceStrategy.class.getName());
+
+ // Set instance group tag for lead controller resource.
+ IdealState leadControllerIdealState =
+ admin.getResourceIdealState(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME);
+ leadControllerIdealState.setInstanceGroupTag(CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE);
+ leadControllerIdealState.setBatchMessageMode(enableBatchMessageMode);
+ // The below config guarantees if active number of replicas is no less than minimum active replica, there will not be partition movements happened.
+ // Set min active replicas to 0 and rebalance delay to 5 minutes so that if any master goes offline, Helix controller waits at most 5 minutes and then re-calculate the participant assignment.
+ // This delay is helpful when periodic tasks are running and we don't want them to be re-run too frequently.
+ // Plus, if virtual id is applied to controller hosts, swapping hosts would be easy as new hosts can use the same virtual id and it takes least effort to change the configs.
+ leadControllerIdealState.setMinActiveReplicas(MIN_ACTIVE_REPLICAS);
+ leadControllerIdealState.setRebalanceDelay(REBALANCE_DELAY_MS);
+ leadControllerIdealState.setDelayRebalanceEnabled(ENABLE_DELAY_REBALANCE);
+ admin.setResourceIdealState(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME, leadControllerIdealState);
+
+ // Explicitly disable this resource when creating this new resource.
+ // When all the controllers are running the code with the logic to handle this resource, it can be enabled for backward compatibility.
+ // In the next major release, we can enable this resource by default, so that all the controller logic can be separated.
+ admin.enableResource(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME, false);
+
+ LOGGER.info("Re-balance lead controller resource with replicas: {}",
+ CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT);
+ // Set it to 1 so that there's only 1 instance (i.e. master) shown in every partitions.
+ admin.rebalance(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME,
+ CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT);
+ }
+ }
+
+ private static void initPropertyStoreIfNeeded(String helixClusterName, String zkPath) {
+ String propertyStorePath = PropertyPathBuilder.propertyStore(helixClusterName);
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
+ new ZkHelixPropertyStore<>(zkPath, new ZNRecordSerializer(), propertyStorePath);
+ if (!propertyStore.exists("/CONFIGS", AccessOption.PERSISTENT)) {
+ propertyStore.create("/CONFIGS", new ZNRecord(""), AccessOption.PERSISTENT);
+ }
+ if (!propertyStore.exists("/CONFIGS/CLUSTER", AccessOption.PERSISTENT)) {
+ propertyStore.create("/CONFIGS/CLUSTER", new ZNRecord(""), AccessOption.PERSISTENT);
+ }
+ if (!propertyStore.exists("/CONFIGS/TABLE", AccessOption.PERSISTENT)) {
+ propertyStore.create("/CONFIGS/TABLE", new ZNRecord(""), AccessOption.PERSISTENT);
+ }
+ if (!propertyStore.exists("/CONFIGS/INSTANCE", AccessOption.PERSISTENT)) {
+ propertyStore.create("/CONFIGS/INSTANCE", new ZNRecord(""), AccessOption.PERSISTENT);
+ }
+ if (!propertyStore.exists("/SCHEMAS", AccessOption.PERSISTENT)) {
+ propertyStore.create("/SCHEMAS", new ZNRecord(""), AccessOption.PERSISTENT);
+ }
+ if (!propertyStore.exists("/SEGMENTS", AccessOption.PERSISTENT)) {
+ propertyStore.create("/SEGMENTS", new ZNRecord(""), AccessOption.PERSISTENT);
+ }
}
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
index f3da797..d91e612 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
@@ -18,9 +18,14 @@
*/
package org.apache.pinot.controller.helix;
+import java.util.Map;
+import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.ControllerStarter;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -62,18 +67,37 @@ public class PinotControllerModeTest extends ControllerTest {
config.setControllerMode(ControllerConf.ControllerMode.DUAL);
config.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++));
+ // Helix cluster will be set up when starting the first controller.
startController(config);
TestUtils.waitForCondition(aVoid -> _helixManager.isConnected(), TIMEOUT_IN_MS,
"Failed to start " + config.getControllerMode() + " controller in " + TIMEOUT_IN_MS + "ms.");
Assert.assertEquals(_controllerStarter.getControllerMode(), ControllerConf.ControllerMode.DUAL);
+ // Enable the lead controller resource.
+ _helixAdmin.enableResource(getHelixClusterName(), CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, true);
+
+ // Starting a second dual-mode controller. Helix cluster has already been set up.
+ ControllerConf controllerConfig = getDefaultControllerConfiguration();
+ controllerConfig.setHelixClusterName(getHelixClusterName());
+ controllerConfig.setControllerMode(ControllerConf.ControllerMode.DUAL);
+ controllerConfig.setControllerPort(
+ Integer.toString(Integer.parseInt(this.config.getControllerPort()) + controllerPortOffset++));
+
+ ControllerStarter secondDualModeController = new TestOnlyControllerStarter(controllerConfig);
+ secondDualModeController.start();
+ TestUtils
+ .waitForCondition(aVoid -> secondDualModeController.getHelixResourceManager().getHelixZkManager().isConnected(),
+ TIMEOUT_IN_MS, "Failed to start " + config.getControllerMode() + " controller in " + TIMEOUT_IN_MS + "ms.");
+ Assert.assertEquals(secondDualModeController.getControllerMode(), ControllerConf.ControllerMode.DUAL);
+
+ secondDualModeController.stop();
stopController();
_controllerStarter = null;
}
// TODO: enable it after removing ControllerLeadershipManager which requires both CONTROLLER and PARTICIPANT
// HelixManager
- @Test (enabled = false)
+ @Test(enabled = false)
public void testPinotOnlyController()
throws Exception {
config.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY);
@@ -95,27 +119,94 @@ public class PinotControllerModeTest extends ControllerTest {
ControllerStarter helixControllerStarter = new ControllerStarter(config2);
helixControllerStarter.start();
HelixManager helixControllerManager = helixControllerStarter.getHelixControllerManager();
+ HelixAdmin helixAdmin = helixControllerManager.getClusterManagmentTool();
TestUtils.waitForCondition(aVoid -> helixControllerManager.isConnected(), TIMEOUT_IN_MS,
"Failed to start " + config2.getControllerMode() + " controller in " + TIMEOUT_IN_MS + "ms.");
+ // Enable the lead controller resource.
+ helixAdmin.enableResource(getHelixClusterName(), CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, true);
+
// Starting a pinot only controller.
- startController(config, false);
- TestUtils.waitForCondition(aVoid -> _helixResourceManager.getHelixZkManager().isConnected(), TIMEOUT_IN_MS,
- "Failed to start " + config.getControllerMode() + " controller in " + TIMEOUT_IN_MS + "ms.");
- Assert.assertEquals(_controllerStarter.getControllerMode(), ControllerConf.ControllerMode.PINOT_ONLY);
+ ControllerConf config3 = getDefaultControllerConfiguration();
+ config3.setHelixClusterName(getHelixClusterName());
+ config3.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY);
+ config3.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++));
+
+ ControllerStarter firstPinotOnlyController = new TestOnlyControllerStarter(config3);
+ firstPinotOnlyController.start();
+ PinotHelixResourceManager firstPinotOnlyPinotHelixResourceManager =
+ firstPinotOnlyController.getHelixResourceManager();
+
+ TestUtils.waitForCondition(aVoid -> firstPinotOnlyPinotHelixResourceManager.getHelixZkManager().isConnected(),
+ TIMEOUT_IN_MS, "Failed to start " + config.getControllerMode() + " controller in " + TIMEOUT_IN_MS + "ms.");
+ Assert.assertEquals(firstPinotOnlyController.getControllerMode(), ControllerConf.ControllerMode.PINOT_ONLY);
// Start a second Pinot only controller.
- config.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++));
- ControllerStarter secondControllerStarter = new TestOnlyControllerStarter(config);
+ ControllerConf config4 = getDefaultControllerConfiguration();
+ config4.setHelixClusterName(getHelixClusterName());
+ config4.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY);
+ config4.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++));
+ ControllerStarter secondControllerStarter = new TestOnlyControllerStarter(config4);
secondControllerStarter.start();
// Two controller instances assigned to cluster.
- TestUtils.waitForCondition(aVoid -> _helixResourceManager.getAllInstances().size() == 2, TIMEOUT_IN_MS,
- "Failed to start the 2nd pinot only controller in " + TIMEOUT_IN_MS + "ms.");
-
+ TestUtils
+ .waitForCondition(aVoid -> firstPinotOnlyPinotHelixResourceManager.getAllInstances().size() == 2, TIMEOUT_IN_MS,
+ "Failed to start the 2nd pinot only controller in " + TIMEOUT_IN_MS + "ms.");
+
+ // Disable lead controller resource, all the participants are in offline state (from slave state).
+ helixAdmin.enableResource(getHelixClusterName(), CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, false);
+
+ TestUtils.waitForCondition(aVoid -> {
+ ExternalView leadControllerResourceExternalView = firstPinotOnlyPinotHelixResourceManager.getHelixAdmin()
+ .getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+ for (String partition : leadControllerResourceExternalView.getPartitionSet()) {
+ Map<String, String> stateMap = leadControllerResourceExternalView.getStateMap(partition);
+ for (Map.Entry<String, String> entry : stateMap.entrySet()) {
+ if (!"OFFLINE".equals(entry.getValue())) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }, TIMEOUT_IN_MS, "Failed to mark all the participants offline in " + TIMEOUT_IN_MS + "ms.");
+
+ // Re-enable lead controller resource, all the participants are in healthy state (either master or slave).
+ helixAdmin.enableResource(getHelixClusterName(), CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, true);
+
+ // Shutdown one controller, it will be removed from external view of lead controller resource.
secondControllerStarter.stop();
- stopController();
+ TestUtils.waitForCondition(aVoid -> {
+ ExternalView leadControllerResourceExternalView = firstPinotOnlyPinotHelixResourceManager.getHelixAdmin()
+ .getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+ for (String partition : leadControllerResourceExternalView.getPartitionSet()) {
+ Map<String, String> stateMap = leadControllerResourceExternalView.getStateMap(partition);
+ // Only 1 participant left in each partition, which will become the master.
+ for (Map.Entry<String, String> entry : stateMap.entrySet()) {
+ if (!"MASTER".equals(entry.getValue())) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }, TIMEOUT_IN_MS, "Failed to mark all the participants MASTER in " + TIMEOUT_IN_MS + "ms.");
+
+ // Shutdown the only one controller left, the partition map should be empty.
+ firstPinotOnlyController.stop();
+ TestUtils.waitForCondition(aVoid -> {
+ ExternalView leadControllerResourceExternalView = helixAdmin
+ .getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+ for (String partition : leadControllerResourceExternalView.getPartitionSet()) {
+ Map<String, String> stateMap = leadControllerResourceExternalView.getStateMap(partition);
+ // There's no participant in all the partitions.
+ if (!stateMap.isEmpty()) {
+ return false;
+ }
+ }
+ return true;
+ }, TIMEOUT_IN_MS, "Failed to have all the partitions empty in " + TIMEOUT_IN_MS + "ms.");
+
_controllerStarter = null;
helixControllerStarter.stop();
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index 573ab90..7d9f8d9 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -19,13 +19,23 @@
package org.apache.pinot.controller.helix.core;
import com.google.common.collect.BiMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
+import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.config.TableConfig;
@@ -45,12 +55,14 @@ import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.*;
import static org.apache.pinot.controller.helix.core.PinotHelixResourceManager.*;
@@ -64,34 +76,34 @@ public class PinotHelixResourceManagerTest extends ControllerTest {
private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME);
private static final int CONNECTION_TIMEOUT_IN_MILLISECOND = 10_000;
private static final int MAX_TIMEOUT_IN_MILLISECOND = 5_000;
+ private static final int MAXIMUM_NUMBER_OF_CONTROLLER_INSTANCES = 10;
+ private static final long TIMEOUT_IN_MS = 10_000L;
private final String _helixClusterName = getHelixClusterName();
@BeforeClass
- public void setUp()
- throws Exception {
+ public void setUp() throws Exception {
startZk();
ControllerConf config = getDefaultControllerConfiguration();
config.setTenantIsolationEnabled(false);
startController(config);
- ControllerRequestBuilderUtil
- .addFakeBrokerInstancesToAutoJoinHelixCluster(_helixClusterName, ZkStarter.DEFAULT_ZK_STR, NUM_INSTANCES,
- false);
- ControllerRequestBuilderUtil
- .addFakeDataInstancesToAutoJoinHelixCluster(_helixClusterName, ZkStarter.DEFAULT_ZK_STR, NUM_INSTANCES, false,
- BASE_SERVER_ADMIN_PORT);
+ ControllerRequestBuilderUtil.addFakeBrokerInstancesToAutoJoinHelixCluster(_helixClusterName,
+ ZkStarter.DEFAULT_ZK_STR, NUM_INSTANCES, false);
+ ControllerRequestBuilderUtil.addFakeDataInstancesToAutoJoinHelixCluster(_helixClusterName, ZkStarter.DEFAULT_ZK_STR,
+ NUM_INSTANCES, false, BASE_SERVER_ADMIN_PORT);
// Create server tenant on all Servers
- Tenant serverTenant =
- new Tenant.TenantBuilder(SERVER_TENANT_NAME).setRole(TenantRole.SERVER).setOfflineInstances(NUM_INSTANCES)
- .build();
+ Tenant serverTenant = new Tenant.TenantBuilder(SERVER_TENANT_NAME).setRole(TenantRole.SERVER)
+ .setOfflineInstances(NUM_INSTANCES)
+ .build();
_helixResourceManager.createServerTenant(serverTenant);
+
+ _helixAdmin.enableResource(getHelixClusterName(), LEAD_CONTROLLER_RESOURCE_NAME, true);
}
@Test
- public void testGetInstanceEndpoints()
- throws InvalidConfigException {
+ public void testGetInstanceEndpoints() throws InvalidConfigException {
Set<String> servers = _helixResourceManager.getAllInstancesForServerTenant(SERVER_TENANT_NAME);
BiMap<String, String> endpoints = _helixResourceManager.getDataInstanceAdminEndpoints(servers);
for (int i = 0; i < NUM_INSTANCES; i++) {
@@ -100,8 +112,7 @@ public class PinotHelixResourceManagerTest extends ControllerTest {
}
@Test
- public void testGetInstanceConfigs()
- throws Exception {
+ public void testGetInstanceConfigs() throws Exception {
Set<String> servers = _helixResourceManager.getAllInstancesForServerTenant(SERVER_TENANT_NAME);
for (String server : servers) {
InstanceConfig cachedInstanceConfig = _helixResourceManager.getHelixInstanceConfig(server);
@@ -118,8 +129,7 @@ public class PinotHelixResourceManagerTest extends ControllerTest {
zkClient.close();
}
- private void modifyExistingInstanceConfig(ZkClient zkClient)
- throws InterruptedException {
+ private void modifyExistingInstanceConfig(ZkClient zkClient) throws InterruptedException {
String instanceName = "Server_localhost_" + new Random().nextInt(NUM_INSTANCES);
String instanceConfigPath = PropertyPathBuilder.instanceConfig(_helixClusterName, instanceName);
Assert.assertTrue(zkClient.exists(instanceConfigPath));
@@ -150,8 +160,7 @@ public class PinotHelixResourceManagerTest extends ControllerTest {
zkClient.writeData(instanceConfigPath, znRecord);
}
- private void addAndRemoveNewInstanceConfig(ZkClient zkClient)
- throws Exception {
+ private void addAndRemoveNewInstanceConfig(ZkClient zkClient) throws Exception {
int biggerRandomNumber = NUM_INSTANCES + new Random().nextInt(NUM_INSTANCES);
String instanceName = "Server_localhost_" + String.valueOf(biggerRandomNumber);
String instanceConfigPath = PropertyPathBuilder.instanceConfig(_helixClusterName, instanceName);
@@ -184,17 +193,18 @@ public class PinotHelixResourceManagerTest extends ControllerTest {
}
@Test
- public void testRebuildBrokerResourceFromHelixTags()
- throws Exception {
+ public void testRebuildBrokerResourceFromHelixTags() throws Exception {
// Create broker tenant on 3 Brokers
Tenant brokerTenant =
new Tenant.TenantBuilder(BROKER_TENANT_NAME).setRole(TenantRole.BROKER).setTotalInstances(3).build();
_helixResourceManager.createBrokerTenant(brokerTenant);
// Create the table
- TableConfig tableConfig =
- new TableConfig.Builder(TableType.OFFLINE).setTableName(TABLE_NAME).setNumReplicas(3)
- .setBrokerTenant(BROKER_TENANT_NAME).setServerTenant(SERVER_TENANT_NAME).build();
+ TableConfig tableConfig = new TableConfig.Builder(TableType.OFFLINE).setTableName(TABLE_NAME)
+ .setNumReplicas(3)
+ .setBrokerTenant(BROKER_TENANT_NAME)
+ .setServerTenant(SERVER_TENANT_NAME)
+ .build();
_helixResourceManager.addTable(tableConfig);
// Check that the BrokerResource ideal state has 3 Brokers assigned to the table
@@ -204,8 +214,8 @@ public class PinotHelixResourceManagerTest extends ControllerTest {
// Untag all Brokers assigned to broker tenant
for (String brokerInstance : _helixResourceManager.getAllInstancesForBrokerTenant(BROKER_TENANT_NAME)) {
- _helixAdmin
- .removeInstanceTag(_helixClusterName, brokerInstance, TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME));
+ _helixAdmin.removeInstanceTag(_helixClusterName, brokerInstance,
+ TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME));
_helixAdmin.addInstanceTag(_helixClusterName, brokerInstance, CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
}
@@ -228,8 +238,7 @@ public class PinotHelixResourceManagerTest extends ControllerTest {
}
@Test
- public void testRetrieveMetadata()
- throws Exception {
+ public void testRetrieveMetadata() throws Exception {
String segmentName = "testSegment";
// Test retrieving OFFLINE segment ZK metadata
@@ -263,7 +272,8 @@ public class PinotHelixResourceManagerTest extends ControllerTest {
}
}
- @Test void testRetrieveTenantNames() {
+ @Test
+ void testRetrieveTenantNames() {
// Create broker tenant on 1 Broker
Tenant brokerTenant =
new Tenant.TenantBuilder(BROKER_TENANT_NAME).setRole(TenantRole.BROKER).setTotalInstances(1).build();
@@ -366,8 +376,7 @@ public class PinotHelixResourceManagerTest extends ControllerTest {
}
// Create broker tenant on 3 Brokers
- Tenant brokerTenant =
- new Tenant.TenantBuilder(brokerTag).setRole(TenantRole.BROKER).setTotalInstances(3).build();
+ Tenant brokerTenant = new Tenant.TenantBuilder(brokerTag).setRole(TenantRole.BROKER).setTotalInstances(3).build();
_helixResourceManager.createBrokerTenant(brokerTenant);
// empty server instances list
@@ -452,18 +461,119 @@ public class PinotHelixResourceManagerTest extends ControllerTest {
}
for (String brokerInstance : _helixResourceManager.getAllInstancesForBrokerTenant(brokerTag)) {
- _helixAdmin
- .removeInstanceTag(_helixClusterName, brokerInstance, TagNameUtils.getBrokerTagForTenant(brokerTag));
+ _helixAdmin.removeInstanceTag(_helixClusterName, brokerInstance, TagNameUtils.getBrokerTagForTenant(brokerTag));
_helixAdmin.addInstanceTag(_helixClusterName, brokerInstance, CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
}
}
+ @Test
+ public void testLeadControllerResource()
+ throws Exception {
+ IdealState leadControllerResourceIdealState = _helixResourceManager.getHelixAdmin()
+ .getResourceIdealState(getHelixClusterName(), CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+ Assert.assertTrue(leadControllerResourceIdealState.isValid());
+ Assert.assertTrue(leadControllerResourceIdealState.isEnabled());
+ Assert.assertEquals(leadControllerResourceIdealState.getInstanceGroupTag(),
+ CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE);
+ Assert.assertEquals(leadControllerResourceIdealState.getNumPartitions(),
+ CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE);
+ Assert.assertEquals(leadControllerResourceIdealState.getReplicas(),
+ Integer.toString(LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT));
+ Assert.assertEquals(leadControllerResourceIdealState.getRebalanceMode(), IdealState.RebalanceMode.FULL_AUTO);
+ Assert.assertTrue(leadControllerResourceIdealState.getInstanceSet(
+ leadControllerResourceIdealState.getPartitionSet().iterator().next()).isEmpty());
+
+ TestUtils
+ .waitForCondition(aVoid -> {
+ ExternalView leadControllerResourceExternalView = _helixResourceManager.getHelixAdmin()
+ .getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+ for (String partition : leadControllerResourceExternalView.getPartitionSet()) {
+ Map<String, String> stateMap = leadControllerResourceExternalView.getStateMap(partition);
+ Map.Entry<String, String> entry = stateMap.entrySet().iterator().next();
+ boolean result = (PREFIX_OF_CONTROLLER_INSTANCE + LOCAL_HOST + "_" + _controllerPort).equals(entry.getKey());
+ result &= "MASTER".equals(entry.getValue());
+ if (!result) {
+ return false;
+ }
+ }
+ return true;
+ },
+ TIMEOUT_IN_MS, "Failed to assign controller hosts to lead controller resource in " + TIMEOUT_IN_MS + " ms.");
+ }
+
+ @Test
+ public void testLeadControllerAssignment() {
+ // Given a number of instances (from 1 to 10), make sure all the instances got assigned to lead controller resource.
+ for (int nInstances = 1; nInstances <= MAXIMUM_NUMBER_OF_CONTROLLER_INSTANCES; nInstances++) {
+ List<String> instanceNames = new ArrayList<>(nInstances);
+ List<Integer> ports = new ArrayList<>(nInstances);
+ for (int i = 0; i < nInstances; i++) {
+ instanceNames.add(PREFIX_OF_CONTROLLER_INSTANCE + LOCAL_HOST + "_" + i);
+ ports.add(i);
+ }
+
+ List<String> partitions = new ArrayList<>(NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE);
+ for (int i = 0; i < NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE; i++) {
+ partitions.add(LEAD_CONTROLLER_RESOURCE_NAME + "_" + i);
+ }
+
+ LinkedHashMap<String, Integer> states = new LinkedHashMap<>(2);
+ states.put("OFFLINE", 0);
+ states.put("SLAVE", LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT - 1);
+ states.put("MASTER", 1);
+
+ CrushEdRebalanceStrategy crushEdRebalanceStrategy = new CrushEdRebalanceStrategy();
+ crushEdRebalanceStrategy.init(LEAD_CONTROLLER_RESOURCE_NAME, partitions, states, Integer.MAX_VALUE);
+
+ ClusterDataCache clusterDataCache = new ClusterDataCache();
+ PropertyKey.Builder keyBuilder = new PropertyKey.Builder(getHelixClusterName());
+ HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+ ClusterConfig clusterConfig = accessor.getProperty(keyBuilder.clusterConfig());
+ clusterDataCache.setClusterConfig(clusterConfig);
+
+ Map<String, InstanceConfig> instanceConfigMap = new HashMap<>(nInstances);
+ for (int i = 0; i < nInstances; i++) {
+ String instanceName = instanceNames.get(i);
+ int port = ports.get(i);
+ instanceConfigMap.put(instanceName, new InstanceConfig(instanceName
+ + ", {HELIX_ENABLED=true, HELIX_ENABLED_TIMESTAMP=1559546216610, HELIX_HOST=Controller_localhost, HELIX_PORT="
+ + port + "}{}{TAG_LIST=[controller]}"));
+ }
+ clusterDataCache.setInstanceConfigMap(instanceConfigMap);
+ ZNRecord znRecord =
+ crushEdRebalanceStrategy.computePartitionAssignment(instanceNames, instanceNames, new HashMap<>(0),
+ clusterDataCache);
+
+ Assert.assertNotNull(znRecord);
+ Map<String, List<String>> listFields = znRecord.getListFields();
+ Assert.assertEquals(listFields.size(), NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE);
+
+ Map<String, Integer> instanceToMasterAssignmentCountMap = new HashMap<>();
+ int maxCount = 0;
+ for (List<String> assignments : listFields.values()) {
+ Assert.assertEquals(assignments.size(), LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT);
+ if (!instanceToMasterAssignmentCountMap.containsKey(assignments.get(0))) {
+ instanceToMasterAssignmentCountMap.put(assignments.get(0), 1);
+ } else {
+ instanceToMasterAssignmentCountMap.put(assignments.get(0),
+ instanceToMasterAssignmentCountMap.get(assignments.get(0)) + 1);
+ }
+ maxCount = Math.max(instanceToMasterAssignmentCountMap.get(assignments.get(0)), maxCount);
+ }
+ Assert.assertEquals(instanceToMasterAssignmentCountMap.size(), nInstances,
+ "Not all the instances got assigned to the resource!");
+ for (Integer count : instanceToMasterAssignmentCountMap.values()) {
+ Assert.assertTrue((maxCount - count == 0 || maxCount - count == 1), "Instance assignment isn't distributed");
+ }
+ }
+ }
+
@AfterMethod
public void cleanUpBrokerTags() {
// Untag all Brokers for other tests
for (String brokerInstance : _helixResourceManager.getAllInstancesForBrokerTenant(BROKER_TENANT_NAME)) {
- _helixAdmin
- .removeInstanceTag(_helixClusterName, brokerInstance, TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME));
+ _helixAdmin.removeInstanceTag(_helixClusterName, brokerInstance,
+ TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME));
_helixAdmin.addInstanceTag(_helixClusterName, brokerInstance, CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org