You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/03/22 20:56:19 UTC
[incubator-pinot] branch master updated: Add controller mode logic
in ControllerStarter (#3864)
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c9fbd6a Add controller mode logic in ControllerStarter (#3864)
c9fbd6a is described below
commit c9fbd6a11ca18c6497462f4698130d2735addc77
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Fri Mar 22 13:56:13 2019 -0700
Add controller mode logic in ControllerStarter (#3864)
* Add controller mode logic
* Move controller mode out of resource manager
---
.../broker/broker/HelixBrokerStarterTest.java | 76 +++-----
.../apache/pinot/common/utils/CommonConstants.java | 1 +
.../apache/pinot/controller/ControllerConf.java | 17 ++
.../controller/ControllerLeadershipManager.java | 2 +
.../apache/pinot/controller/ControllerStarter.java | 205 ++++++++++++++++-----
.../helix/core/PinotHelixResourceManager.java | 32 +++-
.../realtime/PinotLLCRealtimeSegmentManager.java | 3 +
.../core/realtime/SegmentCompletionManager.java | 5 +
.../rebalance/RebalanceSegmentStrategyFactory.java | 5 +
.../PinotInstanceRestletResourceTest.java | 20 +-
.../controller/api/resources/TableViewsTest.java | 2 +
.../helix/ControllerPeriodicTaskStarterTest.java | 8 +-
.../pinot/controller/helix/ControllerTest.java | 40 +++-
.../controller/helix/PinotControllerModeTest.java | 115 ++++++++++++
.../controller/helix/PinotResourceManagerTest.java | 56 +++---
.../sharding/SegmentAssignmentStrategyTest.java | 88 ++++-----
.../validation/ValidationManagerTest.java | 57 +++---
...vertToRawIndexMinionClusterIntegrationTest.java | 4 +-
.../tests/OfflineClusterIntegrationTest.java | 3 +-
.../admin/command/StartControllerCommand.java | 7 +-
20 files changed, 511 insertions(+), 235 deletions(-)
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index a9a7568..08387a8 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -19,7 +19,6 @@
package org.apache.pinot.broker.broker;
import com.google.common.util.concurrent.Uninterruptibles;
-import com.yammer.metrics.core.MetricsRegistry;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
@@ -31,10 +30,8 @@ import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.configuration.Configuration;
-import org.apache.helix.HelixAdmin;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.broker.broker.helix.DefaultHelixBrokerConfig;
import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
@@ -44,13 +41,10 @@ import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
-import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
-import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
import org.testng.Assert;
import org.testng.annotations.AfterTest;
@@ -58,17 +52,14 @@ import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
-public class HelixBrokerStarterTest {
+public class HelixBrokerStarterTest extends ControllerTest {
private static final int SEGMENT_COUNT = 6;
- private PinotHelixResourceManager _pinotResourceManager;
- private static final String HELIX_CLUSTER_NAME = "TestHelixBrokerStarter";
private static final String RAW_DINING_TABLE_NAME = "dining";
private static final String DINING_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_DINING_TABLE_NAME);
private static final String COFFEE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType("coffee");
private final Configuration _pinotHelixBrokerProperties = DefaultHelixBrokerConfig.getDefaultBrokerConf();
private ZkClient _zkClient;
- private HelixAdmin _helixAdmin;
private HelixBrokerStarter _helixBrokerStarter;
private ZkStarter.ZookeeperInstance _zookeeperInstance;
@@ -77,43 +68,39 @@ public class HelixBrokerStarterTest {
throws Exception {
_zookeeperInstance = ZkStarter.startLocalZkServer();
_zkClient = new ZkClient(ZkStarter.DEFAULT_ZK_STR);
- final String instanceId = "localhost_helixController";
- _pinotResourceManager =
- new PinotHelixResourceManager(ZkStarter.DEFAULT_ZK_STR, HELIX_CLUSTER_NAME, instanceId, null, 10000L,
- true, /*isUpdateStateModel=*/false, false);
- _pinotResourceManager.start();
- _helixAdmin = _pinotResourceManager.getHelixAdmin();
+
+ startController();
_pinotHelixBrokerProperties.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, 8943);
_pinotHelixBrokerProperties
.addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL, 100L);
_helixBrokerStarter =
- new HelixBrokerStarter(HELIX_CLUSTER_NAME, ZkStarter.DEFAULT_ZK_STR, _pinotHelixBrokerProperties);
+ new HelixBrokerStarter(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, _pinotHelixBrokerProperties);
ControllerRequestBuilderUtil
- .addFakeBrokerInstancesToAutoJoinHelixCluster(HELIX_CLUSTER_NAME, ZkStarter.DEFAULT_ZK_STR, 5, true);
+ .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, 5, true);
ControllerRequestBuilderUtil
- .addFakeDataInstancesToAutoJoinHelixCluster(HELIX_CLUSTER_NAME, ZkStarter.DEFAULT_ZK_STR, 1, true);
+ .addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, 1, true);
- while (_helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_OFFLINE").size() == 0
- || _helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_BROKER").size() == 0) {
+ while (_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_OFFLINE").size() == 0
+ || _helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size() == 0) {
Thread.sleep(100);
}
TableConfig offlineTableConfig =
new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(RAW_DINING_TABLE_NAME)
.setTimeColumnName("timeColumn").setTimeType("DAYS").build();
- _pinotResourceManager.addTable(offlineTableConfig);
+ _helixResourceManager.addTable(offlineTableConfig);
setupRealtimeTable();
for (int i = 0; i < 5; i++) {
- _pinotResourceManager
+ _helixResourceManager
.addNewSegment(SegmentMetadataMockUtils.mockSegmentMetadata(RAW_DINING_TABLE_NAME), "downloadUrl");
}
Thread.sleep(1000);
- ExternalView externalView = _helixAdmin.getResourceExternalView(HELIX_CLUSTER_NAME, DINING_TABLE_NAME);
+ ExternalView externalView = _helixAdmin.getResourceExternalView(getHelixClusterName(), DINING_TABLE_NAME);
Assert.assertEquals(externalView.getPartitionSet().size(), 5);
}
@@ -134,18 +121,15 @@ public class HelixBrokerStarterTest {
setStreamConfigs(streamConfigs).build();
Schema schema = new Schema();
schema.setSchemaName(RAW_DINING_TABLE_NAME);
- _pinotResourceManager.addOrUpdateSchema(schema);
- // Fake an PinotLLCRealtimeSegmentManager instance: required for a realtime table creation.
- PinotLLCRealtimeSegmentManager
- .create(_pinotResourceManager, new ControllerConf(), new ControllerMetrics(new MetricsRegistry()));
- _pinotResourceManager.addTable(realtimeTimeConfig);
+ _helixResourceManager.addOrUpdateSchema(schema);
+ _helixResourceManager.addTable(realtimeTimeConfig);
_helixBrokerStarter.getHelixExternalViewBasedRouting()
- .markDataResourceOnline(realtimeTimeConfig, null, new ArrayList<InstanceConfig>());
+ .markDataResourceOnline(realtimeTimeConfig, null, new ArrayList<>());
}
@AfterTest
public void tearDown() {
- _pinotResourceManager.stop();
+ _helixResourceManager.stop();
_zkClient.close();
ZkStarter.stopLocalZkServer(_zookeeperInstance);
}
@@ -155,12 +139,12 @@ public class HelixBrokerStarterTest {
throws Exception {
IdealState idealState;
- Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_BROKER").size(), 6);
- idealState = _helixAdmin.getResourceIdealState(HELIX_CLUSTER_NAME, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+ Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size(), 6);
+ idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
Assert.assertEquals(idealState.getInstanceSet(DINING_TABLE_NAME).size(), SEGMENT_COUNT);
ExternalView externalView =
- _helixAdmin.getResourceExternalView(HELIX_CLUSTER_NAME, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+ _helixAdmin.getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
Assert.assertEquals(externalView.getStateMap(DINING_TABLE_NAME).size(), SEGMENT_COUNT);
HelixExternalViewBasedRouting helixExternalViewBasedRouting =
@@ -187,10 +171,10 @@ public class HelixBrokerStarterTest {
final String tableName = "coffee";
TableConfig tableConfig = new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(tableName)
.setBrokerTenant("testBroker").setServerTenant("testServer").build();
- _pinotResourceManager.addTable(tableConfig);
+ _helixResourceManager.addTable(tableConfig);
- Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_BROKER").size(), 6);
- idealState = _helixAdmin.getResourceIdealState(HELIX_CLUSTER_NAME, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+ Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size(), 6);
+ idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
Assert.assertEquals(idealState.getInstanceSet(COFFEE_TABLE_NAME).size(), SEGMENT_COUNT);
Assert.assertEquals(idealState.getInstanceSet(DINING_TABLE_NAME).size(), SEGMENT_COUNT);
@@ -199,13 +183,13 @@ public class HelixBrokerStarterTest {
@Override
public Boolean call()
throws Exception {
- return _helixAdmin.getResourceExternalView(HELIX_CLUSTER_NAME, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
+ return _helixAdmin.getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE)
.getStateMap(COFFEE_TABLE_NAME).size() == SEGMENT_COUNT;
}
}, 30000L);
externalView =
- _helixAdmin.getResourceExternalView(HELIX_CLUSTER_NAME, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+ _helixAdmin.getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
Assert.assertEquals(externalView.getStateMap(COFFEE_TABLE_NAME).size(), SEGMENT_COUNT);
// Wait up to 30s for routing table to reach the expected size
@@ -225,7 +209,7 @@ public class HelixBrokerStarterTest {
brokerRoutingTableBuilderMap.get(DINING_TABLE_NAME).getRoutingTables().get(0).values().iterator().next().size(),
5);
- _pinotResourceManager
+ _helixResourceManager
.addNewSegment(SegmentMetadataMockUtils.mockSegmentMetadata(RAW_DINING_TABLE_NAME), "downloadUrl");
// Wait up to 30s for external view to reach the expected size
@@ -233,12 +217,12 @@ public class HelixBrokerStarterTest {
@Override
public Boolean call()
throws Exception {
- return _helixAdmin.getResourceExternalView(HELIX_CLUSTER_NAME, DINING_TABLE_NAME).getPartitionSet().size()
+ return _helixAdmin.getResourceExternalView(getHelixClusterName(), DINING_TABLE_NAME).getPartitionSet().size()
== SEGMENT_COUNT;
}
}, 30000L);
- externalView = _helixAdmin.getResourceExternalView(HELIX_CLUSTER_NAME, DINING_TABLE_NAME);
+ externalView = _helixAdmin.getResourceExternalView(getHelixClusterName(), DINING_TABLE_NAME);
Assert.assertEquals(externalView.getPartitionSet().size(), SEGMENT_COUNT);
tableArray = brokerRoutingTableBuilderMap.keySet().toArray();
Arrays.sort(tableArray);
@@ -271,14 +255,14 @@ public class HelixBrokerStarterTest {
Assert.assertEquals(tbi.getTimeValue(), Long.toString(currentTimeBoundary));
- List<String> segmentNames = _pinotResourceManager.getSegmentsFor(DINING_TABLE_NAME);
+ List<String> segmentNames = _helixResourceManager.getSegmentsFor(DINING_TABLE_NAME);
long endTime = currentTimeBoundary + 10;
// Refresh all 5 segments.
for (String segment : segmentNames) {
OfflineSegmentZKMetadata offlineSegmentZKMetadata =
- _pinotResourceManager.getOfflineSegmentZKMetadata(RAW_DINING_TABLE_NAME, segment);
+ _helixResourceManager.getOfflineSegmentZKMetadata(RAW_DINING_TABLE_NAME, segment);
Assert.assertNotNull(offlineSegmentZKMetadata);
- _pinotResourceManager.refreshSegment(
+ _helixResourceManager.refreshSegment(
SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(RAW_DINING_TABLE_NAME, segment, endTime++),
offlineSegmentZKMetadata);
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 0777ad4..cf09060 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -31,6 +31,7 @@ public class CommonConstants {
public static final String PREFIX_OF_SERVER_INSTANCE = "Server_";
public static final String PREFIX_OF_BROKER_INSTANCE = "Broker_";
+ public static final String PREFIX_OF_CONTROLLER_INSTANCE = "Controller_";
public static final String SERVER_INSTANCE_TYPE = "server";
public static final String BROKER_INSTANCE_TYPE = "broker";
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 1179996..1faa949 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -32,6 +32,7 @@ import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.controller.helix.core.util.HelixSetupUtils;
import org.apache.pinot.filesystem.LocalPinotFS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,6 +57,13 @@ public class ControllerConf extends PropertiesConfiguration {
private static final String CONSOLE_WEBAPP_ROOT_PATH = "controller.query.console";
private static final String CONSOLE_WEBAPP_USE_HTTPS = "controller.query.console.useHttps";
private static final String EXTERNAL_VIEW_ONLINE_TO_OFFLINE_TIMEOUT = "controller.upload.onlineToOfflineTimeout";
+ private static final String CONTROLLER_MODE = "controller.mode";
+
+ public enum ControllerMode {
+ DUAL,
+ PINOT_ONLY,
+ HELIX_ONLY
+ }
public static class ControllerPeriodicTasksConf {
// frequency configs
@@ -143,6 +151,7 @@ public class ControllerConf extends PropertiesConfiguration {
private static final int DEFAULT_REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = 64;
private static final boolean DEFAULT_ENABLE_STORAGE_QUOTA_CHECK = true;
private static final boolean DEFAULT_ENABLE_BATCH_MESSAGE_MODE = false;
+ private static final String DEFAULT_CONTROLLER_MODE = ControllerMode.DUAL.name();
private static final String DEFAULT_PINOT_FS_FACTORY_CLASS_LOCAL = LocalPinotFS.class.getName();
@@ -596,4 +605,12 @@ public class ControllerConf extends PropertiesConfiguration {
public long getPeriodicTaskInitialDelayInSeconds() {
return ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds();
}
+
+ public void setControllerMode(ControllerMode controllerMode) {
+ setProperty(CONTROLLER_MODE, controllerMode.name());
+ }
+
+ public ControllerMode getControllerMode() {
+ return ControllerMode.valueOf(getString(CONTROLLER_MODE, DEFAULT_CONTROLLER_MODE).toUpperCase());
+ }
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerLeadershipManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerLeadershipManager.java
index ed6b71f..6ffa655 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerLeadershipManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerLeadershipManager.java
@@ -34,6 +34,7 @@ public class ControllerLeadershipManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ControllerLeadershipManager.class);
+ // TODO: fix the misuse of singleton.
private static ControllerLeadershipManager INSTANCE = null;
private HelixManager _helixManager;
@@ -75,6 +76,7 @@ public class ControllerLeadershipManager {
if (_amILeader) {
onBecomingNonLeader();
}
+ INSTANCE = null;
}
/**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index 6416691..4051990 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -58,6 +58,8 @@ import org.apache.pinot.controller.helix.core.realtime.PinotRealtimeSegmentManag
import org.apache.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategyFactory;
import org.apache.pinot.controller.helix.core.relocation.RealtimeSegmentRelocator;
import org.apache.pinot.controller.helix.core.retention.RetentionManager;
+import org.apache.pinot.controller.helix.core.util.HelixSetupUtils;
+import org.apache.pinot.controller.helix.starter.HelixConfig;
import org.apache.pinot.controller.validation.BrokerResourceValidationManager;
import org.apache.pinot.controller.validation.OfflineSegmentIntervalChecker;
import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
@@ -79,11 +81,21 @@ public class ControllerStarter {
private final ControllerConf _config;
private final ControllerAdminApiApplication _adminApp;
+ // TODO: rename this variable once it's full separated with Helix controller.
private final PinotHelixResourceManager _helixResourceManager;
private final MetricsRegistry _metricsRegistry;
private final ControllerMetrics _controllerMetrics;
private final ExecutorService _executorService;
+ private final String _helixZkURL;
+ private final String _helixClusterName;
+ private final String _instanceId;
+ private final boolean _isUpdateStateModel;
+ private final boolean _enableBatchMessageMode;
+ private final ControllerConf.ControllerMode _controllerMode;
+
+ private HelixManager _helixControllerManager;
+
// Can only be constructed after resource manager getting started
private OfflineSegmentIntervalChecker _offlineSegmentIntervalChecker;
private RealtimeSegmentValidationManager _realtimeSegmentValidationManager;
@@ -95,23 +107,46 @@ public class ControllerStarter {
private ControllerPeriodicTaskScheduler _controllerPeriodicTaskScheduler;
private PinotHelixTaskResourceManager _helixTaskResourceManager;
private PinotRealtimeSegmentManager _realtimeSegmentsManager;
+ private List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbackList;
public ControllerStarter(ControllerConf conf) {
_config = conf;
- _adminApp =
- new ControllerAdminApiApplication(_config.getQueryConsoleWebappPath(), _config.getQueryConsoleUseHttps());
- // Do not use this before the invocation of {@link PinotHelixResourceManager::start()}, which happens in {@link ControllerStarter::start()}
- _helixResourceManager = new PinotHelixResourceManager(_config);
+ _controllerMode = conf.getControllerMode();
+ // Helix related settings.
+ _helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(_config.getZkStr());
+ _helixClusterName = _config.getHelixClusterName();
+ _instanceId = conf.getControllerHost() + "_" + conf.getControllerPort();
+ _isUpdateStateModel = _config.isUpdateSegmentStateModel();
+ _enableBatchMessageMode = _config.getEnableBatchMessageMode();
+
_metricsRegistry = new MetricsRegistry();
_controllerMetrics = new ControllerMetrics(_metricsRegistry);
- _executorService =
- Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("restapi-multiget-thread-%d").build());
+ _serviceStatusCallbackList = new ArrayList<>();
+ if (_controllerMode == ControllerConf.ControllerMode.HELIX_ONLY) {
+ _adminApp = null;
+ _helixResourceManager = null;
+ _executorService = null;
+ } else {
+ _adminApp =
+ new ControllerAdminApiApplication(_config.getQueryConsoleWebappPath(), _config.getQueryConsoleUseHttps());
+ // Do not use this before the invocation of {@link PinotHelixResourceManager::start()}, which happens in {@link ControllerStarter::start()}
+ _helixResourceManager = new PinotHelixResourceManager(_config);
+ _executorService =
+ Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("restapi-multiget-thread-%d").build());
+ }
}
public PinotHelixResourceManager getHelixResourceManager() {
return _helixResourceManager;
}
+ /**
+ * Gets the Helix Manager connected as Helix controller.
+ */
+ public HelixManager getHelixControllerManager() {
+ return _helixControllerManager;
+ }
+
public OfflineSegmentIntervalChecker getOfflineSegmentIntervalChecker() {
return _offlineSegmentIntervalChecker;
}
@@ -133,50 +168,75 @@ public class ControllerStarter {
}
public void start() {
- LOGGER.info("Starting Pinot controller");
-
+ LOGGER.info("Starting Pinot controller in mode: {}.", _controllerMode.name());
Utils.logVersions();
// Set up controller metrics
MetricsHelper.initializeMetrics(_config.subset(METRICS_REGISTRY_NAME));
MetricsHelper.registerMetricsRegistry(_metricsRegistry);
- Configuration pinotFSConfig = _config.subset(CommonConstants.Controller.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY);
- Configuration segmentFetcherFactoryConfig =
- _config.subset(CommonConstants.Controller.PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY);
- Configuration pinotCrypterConfig = _config.subset(CommonConstants.Controller.PREFIX_OF_CONFIG_OF_PINOT_CRYPTER);
-
- // Start all components
- LOGGER.info("Initializing PinotFSFactory");
- try {
- PinotFSFactory.init(pinotFSConfig);
- } catch (Exception e) {
- Utils.rethrowException(e);
+ switch (_controllerMode) {
+ case DUAL:
+ setUpHelixController();
+ setUpPinotController();
+ break;
+ case PINOT_ONLY:
+ setUpPinotController();
+ break;
+ case HELIX_ONLY:
+ setUpHelixController();
+ break;
+ default:
+ LOGGER.error("Invalid mode: " + _controllerMode);
}
- LOGGER.info("Initializing SegmentFetcherFactory");
- try {
- SegmentFetcherFactory.getInstance().init(segmentFetcherFactoryConfig);
- } catch (Exception e) {
- throw new RuntimeException("Caught exception while initializing SegmentFetcherFactory", e);
- }
+ ServiceStatus.setServiceStatusCallback(new ServiceStatus.MultipleCallbackServiceStatusCallback(_serviceStatusCallbackList));
+ _controllerMetrics.initializeGlobalMeters();
+ }
- LOGGER.info("Initializing PinotCrypterFactory");
- try {
- PinotCrypterFactory.init(pinotCrypterConfig);
- } catch (Exception e) {
- throw new RuntimeException("Caught exception while initializing PinotCrypterFactory", e);
+ private void setUpHelixController() {
+ // Register and connect instance as Helix controller.
+ LOGGER.info("Starting Helix controller");
+ _helixControllerManager = HelixSetupUtils
+ .setup(_helixClusterName, _helixZkURL, _instanceId, _isUpdateStateModel, _enableBatchMessageMode);
+
+ // Emit helix controller metrics
+ _controllerMetrics.addCallbackGauge("helix.connected", () -> _helixControllerManager.isConnected() ? 1L : 0L);
+ _controllerMetrics.addCallbackGauge("helix.leader", () -> _helixControllerManager.isLeader() ? 1L : 0L);
+ _helixControllerManager.addPreConnectCallback(
+ () -> _controllerMetrics.addMeteredGlobalValue(ControllerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L));
+
+ _serviceStatusCallbackList.add(generateServiceStatusCallback(_helixControllerManager));
+ }
+
+ private void setUpPinotController() {
+ // Note: Right now we don't allow pinot-only mode to be used in production yet.
+ // Now we only have this mode used in tests.
+ // TODO: Remove this logic once all the helix separation PRs are committed.
+ if (_controllerMode == ControllerConf.ControllerMode.PINOT_ONLY && !isPinotOnlyModeSupported()) {
+ throw new RuntimeException("Pinot only controller currently isn't supported in production yet.");
}
+ // Start all components
+ initPinotFSFactory();
+ initSegmentFetcherFactory();
+ initPinotCrypterFactory();
+
LOGGER.info("Starting Pinot Helix resource manager and connecting to Zookeeper");
_helixResourceManager.start();
- final HelixManager helixManager = _helixResourceManager.getHelixZkManager();
+ HelixManager helixParticipantManager = _helixResourceManager.getHelixZkManager();
LOGGER.info("Init controller leadership manager");
- ControllerLeadershipManager.init(helixManager);
+ // Note: Currently leadership depends on helix controller, thus assign helixControllerManager to ControllerLeadershipManager.
+ // TODO: In the future when Helix separation is completed, leadership only depends on the master in leadControllerResource, and ControllerLeadershipManager will be removed.
+ if (_helixControllerManager != null) {
+ ControllerLeadershipManager.init(_helixControllerManager);
+ } else {
+ ControllerLeadershipManager.init(helixParticipantManager);
+ }
LOGGER.info("Starting task resource manager");
- _helixTaskResourceManager = new PinotHelixTaskResourceManager(new TaskDriver(helixManager));
+ _helixTaskResourceManager = new PinotHelixTaskResourceManager(new TaskDriver(helixParticipantManager));
// Helix resource manager must be started in order to create PinotLLCRealtimeSegmentManager
LOGGER.info("Starting realtime segment manager");
@@ -191,7 +251,7 @@ public class ControllerStarter {
_controllerPeriodicTaskScheduler.init(controllerPeriodicTasks);
LOGGER.info("Creating rebalance segments factory");
- RebalanceSegmentStrategyFactory.createInstance(helixManager);
+ RebalanceSegmentStrategyFactory.createInstance(helixParticipantManager);
String accessControlFactoryClass = _config.getAccessControlFactoryClass();
LOGGER.info("Use class: {} as the AccessControlFactory", accessControlFactoryClass);
@@ -233,8 +293,6 @@ public class ControllerStarter {
LOGGER.info("Controller services available at http://{}:{}/", _config.getControllerHost(),
_config.getControllerPort());
- _controllerMetrics.addCallbackGauge("helix.connected", () -> helixManager.isConnected() ? 1L : 0L);
- _controllerMetrics.addCallbackGauge("helix.leader", () -> helixManager.isLeader() ? 1L : 0L);
_controllerMetrics.addCallbackGauge("dataDir.exists", () -> new File(_config.getDataDir()).exists() ? 1L : 0L);
_controllerMetrics.addCallbackGauge("dataDir.fileOpLatencyMs", () -> {
File dataDir = new File(_config.getDataDir());
@@ -256,9 +314,13 @@ public class ControllerStarter {
}
});
- ServiceStatus.setServiceStatusCallback(new ServiceStatus.ServiceStatusCallback() {
+ _serviceStatusCallbackList.add(generateServiceStatusCallback(helixParticipantManager));
+ }
+
+ private ServiceStatus.ServiceStatusCallback generateServiceStatusCallback(HelixManager helixManager) {
+ return new ServiceStatus.ServiceStatusCallback() {
private boolean _isStarted = false;
- private String _statusDescription = "Helix ZK Not connected";
+ private String _statusDescription = "Helix ZK Not connected as " + helixManager.getInstanceType();
@Override
public ServiceStatus.Status getServiceStatus() {
@@ -285,11 +347,42 @@ public class ControllerStarter {
public String getStatusDescription() {
return _statusDescription;
}
- });
+ };
+ }
- helixManager.addPreConnectCallback(
- () -> _controllerMetrics.addMeteredGlobalValue(ControllerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L));
- _controllerMetrics.initializeGlobalMeters();
+ private void initPinotFSFactory() {
+ Configuration pinotFSConfig = _config.subset(CommonConstants.Controller.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY);
+ LOGGER.info("Initializing PinotFSFactory");
+ try {
+ PinotFSFactory.init(pinotFSConfig);
+ } catch (Exception e) {
+ Utils.rethrowException(e);
+ }
+ }
+
+ private void initSegmentFetcherFactory() {
+ Configuration segmentFetcherFactoryConfig =
+ _config.subset(CommonConstants.Controller.PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY);
+ LOGGER.info("Initializing SegmentFetcherFactory");
+ try {
+ SegmentFetcherFactory.getInstance().init(segmentFetcherFactoryConfig);
+ } catch (Exception e) {
+ throw new RuntimeException("Caught exception while initializing SegmentFetcherFactory", e);
+ }
+ }
+
+ private void initPinotCrypterFactory() {
+ Configuration pinotCrypterConfig = _config.subset(CommonConstants.Controller.PREFIX_OF_CONFIG_OF_PINOT_CRYPTER);
+ LOGGER.info("Initializing PinotCrypterFactory");
+ try {
+ PinotCrypterFactory.init(pinotCrypterConfig);
+ } catch (Exception e) {
+ throw new RuntimeException("Caught exception while initializing PinotCrypterFactory", e);
+ }
+ }
+
+ public ControllerConf.ControllerMode getControllerMode() {
+ return _controllerMode;
}
@VisibleForTesting
@@ -319,7 +412,28 @@ public class ControllerStarter {
}
public void stop() {
+ switch (_controllerMode) {
+ case DUAL:
+ stopPinotController();
+ stopHelixController();
+ break;
+ case PINOT_ONLY:
+ stopPinotController();
+ break;
+ case HELIX_ONLY:
+ stopHelixController();
+ break;
+ }
+ }
+
+ private void stopHelixController() {
+ LOGGER.info("Disconnecting helix zk manager");
+ _helixControllerManager.disconnect();
+ }
+
+ private void stopPinotController() {
try {
+ // Stopping ControllerLeadershipManager has to be done before stopping HelixResourceManager.
LOGGER.info("Stopping controller leadership manager");
ControllerLeadershipManager.getInstance().stop();
@@ -339,12 +453,19 @@ public class ControllerStarter {
LOGGER.info("Stopping resource manager");
_helixResourceManager.stop();
+ LOGGER.info("Stopping rebalance segments factory");
+ RebalanceSegmentStrategyFactory.stop();
+
_executorService.shutdownNow();
} catch (final Exception e) {
LOGGER.error("Caught exception while shutting down", e);
}
}
+ public boolean isPinotOnlyModeSupported() {
+ return false;
+ }
+
public MetricsRegistry getMetricsRegistry() {
return _metricsRegistry;
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 5e15bff..04b6f9b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -46,6 +46,7 @@ import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
@@ -103,7 +104,6 @@ import org.apache.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategy
import org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategy;
import org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategyEnum;
import org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategyFactory;
-import org.apache.pinot.controller.helix.core.util.HelixSetupUtils;
import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
import org.apache.pinot.controller.helix.starter.HelixConfig;
import org.apache.pinot.core.realtime.stream.StreamConfig;
@@ -128,7 +128,6 @@ public class PinotHelixResourceManager {
private final String _dataDir;
private final long _externalViewOnlineToOfflineTimeoutMillis;
private final boolean _isSingleTenantCluster;
- private final boolean _isUpdateStateModel;
private final boolean _enableBatchMessageMode;
private HelixManager _helixZkManager;
@@ -142,37 +141,34 @@ public class PinotHelixResourceManager {
public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String helixClusterName,
@Nonnull String controllerInstanceId, String dataDir, long externalViewOnlineToOfflineTimeoutMillis,
- boolean isSingleTenantCluster, boolean isUpdateStateModel, boolean enableBatchMessageMode) {
+ boolean isSingleTenantCluster, boolean enableBatchMessageMode) {
_helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL);
_helixClusterName = helixClusterName;
_instanceId = controllerInstanceId;
_dataDir = dataDir;
_externalViewOnlineToOfflineTimeoutMillis = externalViewOnlineToOfflineTimeoutMillis;
_isSingleTenantCluster = isSingleTenantCluster;
- _isUpdateStateModel = isUpdateStateModel;
_enableBatchMessageMode = enableBatchMessageMode;
}
public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String helixClusterName,
@Nonnull String controllerInstanceId, @Nonnull String dataDir) {
this(zkURL, helixClusterName, controllerInstanceId, dataDir, DEFAULT_EXTERNAL_VIEW_UPDATE_TIMEOUT_MILLIS, false,
- false, false);
+ true);
}
public PinotHelixResourceManager(@Nonnull ControllerConf controllerConf) {
this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(),
controllerConf.getControllerHost() + "_" + controllerConf.getControllerPort(), controllerConf.getDataDir(),
controllerConf.getExternalViewOnlineToOfflineTimeout(), controllerConf.tenantIsolationEnabled(),
- controllerConf.isUpdateSegmentStateModel(), controllerConf.getEnableBatchMessageMode());
+ controllerConf.getEnableBatchMessageMode());
}
/**
* Create Helix cluster if needed, and then start a Pinot controller instance.
*/
public synchronized void start() {
- _helixZkManager = HelixSetupUtils
- .setup(_helixClusterName, _helixZkURL, _instanceId, _isUpdateStateModel, _enableBatchMessageMode);
- Preconditions.checkNotNull(_helixZkManager);
+ _helixZkManager = registerAndConnectAsHelixParticipant();
_helixAdmin = _helixZkManager.getClusterManagmentTool();
_propertyStore = _helixZkManager.getHelixPropertyStore();
_helixDataAccessor = _helixZkManager.getHelixDataAccessor();
@@ -254,6 +250,24 @@ public class PinotHelixResourceManager {
}
/**
+ * Register and connect to Helix cluster as PARTICIPANT role.
+ */
+ private HelixManager registerAndConnectAsHelixParticipant() {
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(_helixClusterName, CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + _instanceId,
+ InstanceType.PARTICIPANT, _helixZkURL);
+ try {
+ helixManager.connect();
+ return helixManager;
+ } catch (Exception e) {
+ String errorMsg =
+ String.format("Exception when connecting the instance %s as Participant to Helix.", _instanceId);
+ LOGGER.error(errorMsg, e);
+ throw new RuntimeException(errorMsg);
+ }
+ }
+
+ /**
* Instance related APIs
*/
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 7a883ca..c26780b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -111,6 +111,7 @@ public class PinotLLCRealtimeSegmentManager {
*/
private static int MAX_SEGMENT_COMPLETION_TIME_MILLIS = 300_000; // 5 MINUTES
+ // TODO: fix the misuse of singleton.
private static PinotLLCRealtimeSegmentManager INSTANCE = null;
private final HelixAdmin _helixAdmin;
@@ -179,6 +180,8 @@ public class PinotLLCRealtimeSegmentManager {
}
}
LOGGER.info("Wait completed: Number of completing segments = {}", _numCompletingSegments.get());
+ INSTANCE = null;
+ SegmentCompletionManager.stop();
}
protected PinotLLCRealtimeSegmentManager(HelixAdmin helixAdmin, String clusterName, HelixManager helixManager,
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index b499e3e..7591898 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -64,6 +64,7 @@ public class SegmentCompletionManager {
ABORTED, // state machine is aborted. we will start a fresh one when the next segmentConsumed comes in.
}
+ // TODO: fix the misuse of singleton.
private static SegmentCompletionManager _instance = null;
private final HelixManager _helixManager;
@@ -1115,6 +1116,10 @@ public class SegmentCompletionManager {
}
}
+ public static void stop() {
+ _instance = null;
+ }
+
@VisibleForTesting
protected boolean isLeader() {
return ControllerLeadershipManager.getInstance().isLeader();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSegmentStrategyFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSegmentStrategyFactory.java
index 8e6ca40..30ac8af 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSegmentStrategyFactory.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSegmentStrategyFactory.java
@@ -28,6 +28,7 @@ import org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategy
*/
public class RebalanceSegmentStrategyFactory {
+ // TODO: fix the misuse of singleton.
private static RebalanceSegmentStrategyFactory INSTANCE = null;
private HelixManager _helixManager;
@@ -63,4 +64,8 @@ public class RebalanceSegmentStrategyFactory {
return new DefaultRebalanceSegmentStrategy(_helixManager);
}
}
+
+ public static void stop() {
+ INSTANCE = null;
+ }
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResourceTest.java
index 0329199..28049ec 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResourceTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResourceTest.java
@@ -46,9 +46,9 @@ public class PinotInstanceRestletResourceTest extends ControllerTest {
@Test
public void testInstanceListingAndCreation()
throws Exception {
- // Check that there are no instances
+ // Check that there is only one instance, which is the controller instance.
JsonNode instanceList = JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceList()));
- assertEquals(instanceList.get("instances").size(), 0, "Expected empty instance list at beginning of test");
+ assertEquals(instanceList.get("instances").size(), 1, "Expected only one instance at beginning of test");
// Create untagged broker and server instances
ObjectNode brokerInstance =
@@ -59,17 +59,17 @@ public class PinotInstanceRestletResourceTest extends ControllerTest {
(ObjectNode) JsonUtils.stringToJsonNode("{\"host\":\"1.2.3.4\", \"type\":\"server\", \"port\":\"2345\"}");
sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(), serverInstance.toString());
- // Check that there are two instances
+ // Check that there are three instances
TestUtils.waitForCondition(aVoid -> {
try {
// Check that there are two instances
return
JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceList())).get("instances")
- .size() == 2;
+ .size() == 3;
} catch (Exception e) {
throw new RuntimeException(e);
}
- }, 500L, 10_000L, "Expected two instances after creation of tagged instances");
+ }, 500L, 10_000L, "Expected three instances after creation of tagged instances");
// Create tagged broker and server instances
brokerInstance.put("tag", "someTag");
@@ -83,14 +83,14 @@ public class PinotInstanceRestletResourceTest extends ControllerTest {
// It may take some time for cache data accessor to update its data.
TestUtils.waitForCondition(aVoid -> {
try {
- // Check that there are four instances
+ // Check that there are five instances
return
JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceList())).get("instances")
- .size() == 4;
+ .size() == 5;
} catch (Exception e) {
throw new RuntimeException(e);
}
- }, 500L, 10_000L, "Expected four instances after creation of tagged instances");
+ }, 500L, 10_000L, "Expected five instances after creation of tagged instances");
// Create duplicate broker and server instances (both calls should fail)
try {
@@ -107,11 +107,11 @@ public class PinotInstanceRestletResourceTest extends ControllerTest {
// Expected
}
- // Check that there are four instances
+ // Check that there are five instances
JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceList()));
assertEquals(
JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceList())).get("instances")
- .size(), 4, "Expected fore instances after creation of duplicate instances");
+ .size(), 5, "Expected five instances after creation of duplicate instances");
// Check that the instances are properly created
JsonNode instance = JsonUtils
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/TableViewsTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/TableViewsTest.java
index 18d1387..4ac9418 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/TableViewsTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/TableViewsTest.java
@@ -22,6 +22,7 @@ import java.net.HttpURLConnection;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
+import org.apache.helix.InstanceType;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.utils.CommonConstants;
@@ -65,6 +66,7 @@ public class TableViewsTest extends ControllerTest {
TableConfig tableConfig =
new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME)
.setNumReplicas(2).build();
+ Assert.assertEquals(_helixManager.getInstanceType(), InstanceType.CONTROLLER);
_helixResourceManager.addTable(tableConfig);
_helixResourceManager
.addNewSegment(SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, OFFLINE_SEGMENT_NAME),
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterTest.java
index d37cc65..8f72a59 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterTest.java
@@ -55,6 +55,7 @@ public class ControllerPeriodicTaskStarterTest extends ControllerTest {
_mockControllerStarter = new MockControllerStarter(config);
_mockControllerStarter.start();
_helixResourceManager = _mockControllerStarter.getHelixResourceManager();
+ _helixManager = _mockControllerStarter.getHelixControllerManager();
}
@Override
@@ -65,7 +66,12 @@ public class ControllerPeriodicTaskStarterTest extends ControllerTest {
_mockControllerStarter = null;
}
- private class MockControllerStarter extends ControllerStarter {
+ @Override
+ protected ControllerStarter getControllerStarter() {
+ return _mockControllerStarter;
+ }
+
+ private class MockControllerStarter extends TestOnlyControllerStarter {
private static final int NUM_PERIODIC_TASKS = 7;
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index b3d6d31..20bdcfb 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -105,11 +105,27 @@ public abstract class ControllerTest {
return config;
}
+ public class TestOnlyControllerStarter extends ControllerStarter {
+
+ TestOnlyControllerStarter(ControllerConf conf) {
+ super(conf);
+ }
+
+ @Override
+ public boolean isPinotOnlyModeSupported() {
+ return true;
+ }
+ }
+
protected void startController() {
startController(getDefaultControllerConfiguration());
}
protected void startController(ControllerConf config) {
+ startController(config, true);
+ }
+
+ protected void startController(ControllerConf config, boolean deleteCluster) {
Assert.assertNotNull(config);
Assert.assertNull(_controllerStarter);
@@ -123,21 +139,31 @@ public abstract class ControllerTest {
String zkStr = config.getZkStr();
_zkClient = new ZkClient(zkStr);
- if (_zkClient.exists("/" + helixClusterName)) {
+ if (_zkClient.exists("/" + helixClusterName) && deleteCluster) {
_zkClient.deleteRecursive("/" + helixClusterName);
}
startControllerStarter(config);
- _helixManager = _helixResourceManager.getHelixZkManager();
- _helixAdmin = _helixResourceManager.getHelixAdmin();
- _propertyStore = _helixResourceManager.getPropertyStore();
+ // HelixResourceManager is null in Helix only mode, while HelixManager is null in Pinot only mode.
+ switch (getControllerStarter().getControllerMode()) {
+ case DUAL:
+ case PINOT_ONLY:
+ _helixAdmin = _helixResourceManager.getHelixAdmin();
+ _propertyStore = _helixResourceManager.getPropertyStore();
+ break;
+ case HELIX_ONLY:
+ _helixAdmin = _helixManager.getClusterManagmentTool();
+ _propertyStore = _helixManager.getHelixPropertyStore();
+ break;
+ }
}
protected void startControllerStarter(ControllerConf config) {
- _controllerStarter = new ControllerStarter(config);
+ _controllerStarter = new TestOnlyControllerStarter(config);
_controllerStarter.start();
_helixResourceManager = _controllerStarter.getHelixResourceManager();
+ _helixManager = _controllerStarter.getHelixControllerManager();
}
protected void stopController() {
@@ -153,6 +179,10 @@ public abstract class ControllerTest {
_controllerStarter = null;
}
+ protected ControllerStarter getControllerStarter() {
+ return _controllerStarter;
+ }
+
protected Schema createDummySchema(String tableName) {
Schema schema = new Schema();
schema.setSchemaName(tableName);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
new file mode 100644
index 0000000..9bf70bd
--- /dev/null
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix;
+
+import org.apache.helix.HelixManager;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.ControllerStarter;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class PinotControllerModeTest extends ControllerTest {
+ private static long TIMEOUT_IN_MS = 10_000L;
+ private ControllerConf config;
+ private int controllerPortOffset;
+
+ @BeforeClass
+ public void setUp() {
+ startZk();
+ config = getDefaultControllerConfiguration();
+ controllerPortOffset = 0;
+ }
+
+ @Test
+ public void testHelixOnlyController()
+ throws Exception {
+ config.setControllerMode(ControllerConf.ControllerMode.HELIX_ONLY);
+ config.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++));
+
+ startController(config);
+ TestUtils.waitForCondition(aVoid -> _helixManager.isConnected(), TIMEOUT_IN_MS,
+ "Failed to start " + config.getControllerMode() + " controller in " + TIMEOUT_IN_MS + "ms.");
+
+ Assert.assertEquals(_controllerStarter.getControllerMode(), ControllerConf.ControllerMode.HELIX_ONLY);
+
+ stopController();
+ _controllerStarter = null;
+ }
+
+ @Test
+ public void testDualModeController()
+ throws Exception {
+ config.setControllerMode(ControllerConf.ControllerMode.DUAL);
+ config.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++));
+
+ startController(config);
+ TestUtils.waitForCondition(aVoid -> _helixManager.isConnected(), TIMEOUT_IN_MS,
+ "Failed to start " + config.getControllerMode() + " controller in " + TIMEOUT_IN_MS + "ms.");
+ Assert.assertEquals(_controllerStarter.getControllerMode(), ControllerConf.ControllerMode.DUAL);
+
+ stopController();
+ _controllerStarter = null;
+ }
+
+ @Test
+ public void testPinotOnlyController()
+ throws Exception {
+
+ config.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY);
+ config.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++));
+
+ // Starting pinot only controller before starting helix controller should fail.
+ try {
+ startController(config);
+ Assert.fail("Starting pinot only controller should fail!");
+ } catch (RuntimeException e) {
+ _controllerStarter = null;
+ }
+
+ // Starting a helix controller.
+ ControllerConf config2 = getDefaultControllerConfiguration();
+ config2.setHelixClusterName(getHelixClusterName());
+ config2.setControllerMode(ControllerConf.ControllerMode.HELIX_ONLY);
+ config2.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++));
+ ControllerStarter helixControllerStarter = new ControllerStarter(config2);
+ helixControllerStarter.start();
+ HelixManager helixControllerManager = helixControllerStarter.getHelixControllerManager();
+ TestUtils.waitForCondition(aVoid -> helixControllerManager.isConnected(), TIMEOUT_IN_MS,
+ "Failed to start " + config2.getControllerMode() + " controller in " + TIMEOUT_IN_MS + "ms.");
+
+ // Starting a pinot only controller.
+ startController(config, false);
+ TestUtils.waitForCondition(aVoid -> _helixResourceManager.getHelixZkManager().isConnected(), TIMEOUT_IN_MS,
+ "Failed to start " + config.getControllerMode() + " controller in " + TIMEOUT_IN_MS + "ms.");
+ Assert.assertEquals(_controllerStarter.getControllerMode(), ControllerConf.ControllerMode.PINOT_ONLY);
+
+ stopController();
+ _controllerStarter = null;
+ helixControllerStarter.stop();
+ }
+
+ @AfterClass
+ public void tearDown() {
+ stopZk();
+ }
+}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
index 550fe52..e6a29c6 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
@@ -22,7 +22,6 @@ import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import org.apache.helix.HelixAdmin;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.config.TableConfig;
@@ -30,7 +29,6 @@ import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -38,14 +36,11 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-public class PinotResourceManagerTest {
- private final static String HELIX_CLUSTER_NAME = "testCluster";
+public class PinotResourceManagerTest extends ControllerTest {
private final static String TABLE_NAME = "testTable";
private ZkStarter.ZookeeperInstance _zookeeperInstance;
private ZkClient _zkClient;
- private PinotHelixResourceManager _pinotHelixResourceManager;
- private HelixAdmin _helixAdmin;
@BeforeClass
public void setUp()
@@ -53,27 +48,22 @@ public class PinotResourceManagerTest {
_zookeeperInstance = ZkStarter.startLocalZkServer();
_zkClient = new ZkClient(ZkStarter.DEFAULT_ZK_STR);
- final String instanceId = "localhost_helixController";
- _pinotHelixResourceManager =
- new PinotHelixResourceManager(ZkStarter.DEFAULT_ZK_STR, HELIX_CLUSTER_NAME, instanceId, null, 10000L, true,
- /*isUpdateStateModel=*/ false, false);
- _pinotHelixResourceManager.start();
- _helixAdmin = _pinotHelixResourceManager.getHelixAdmin();
+ startController();
ControllerRequestBuilderUtil
- .addFakeDataInstancesToAutoJoinHelixCluster(HELIX_CLUSTER_NAME, ZkStarter.DEFAULT_ZK_STR, 1, true);
+ .addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, 1, true);
ControllerRequestBuilderUtil
- .addFakeBrokerInstancesToAutoJoinHelixCluster(HELIX_CLUSTER_NAME, ZkStarter.DEFAULT_ZK_STR, 1, true);
- Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_BROKER").size(), 1);
+ .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(), ZkStarter.DEFAULT_ZK_STR, 1, true);
+ Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size(), 1);
Assert
- .assertEquals(_helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_OFFLINE").size(), 1);
+ .assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_OFFLINE").size(), 1);
Assert
- .assertEquals(_helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_REALTIME").size(), 1);
+ .assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_REALTIME").size(), 1);
// Adding table
TableConfig tableConfig =
new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(TABLE_NAME).build();
- _pinotHelixResourceManager.addTable(tableConfig);
+ _helixResourceManager.addTable(tableConfig);
}
@Test
@@ -83,18 +73,18 @@ public class PinotResourceManagerTest {
segmentZKMetadata.setSegmentName("testSegment");
// Segment ZK metadata does not exist
- Assert.assertFalse(_pinotHelixResourceManager.updateZkMetadata(segmentZKMetadata, 0));
+ Assert.assertFalse(_helixResourceManager.updateZkMetadata(segmentZKMetadata, 0));
// Set segment ZK metadata
- Assert.assertTrue(_pinotHelixResourceManager.updateZkMetadata(segmentZKMetadata));
+ Assert.assertTrue(_helixResourceManager.updateZkMetadata(segmentZKMetadata));
// Update ZK metadata
Assert.assertEquals(
- _pinotHelixResourceManager.getSegmentMetadataZnRecord("testTable_OFFLINE", "testSegment").getVersion(), 0);
- Assert.assertTrue(_pinotHelixResourceManager.updateZkMetadata(segmentZKMetadata, 0));
+ _helixResourceManager.getSegmentMetadataZnRecord("testTable_OFFLINE", "testSegment").getVersion(), 0);
+ Assert.assertTrue(_helixResourceManager.updateZkMetadata(segmentZKMetadata, 0));
Assert.assertEquals(
- _pinotHelixResourceManager.getSegmentMetadataZnRecord("testTable_OFFLINE", "testSegment").getVersion(), 1);
- Assert.assertFalse(_pinotHelixResourceManager.updateZkMetadata(segmentZKMetadata, 0));
+ _helixResourceManager.getSegmentMetadataZnRecord("testTable_OFFLINE", "testSegment").getVersion(), 1);
+ Assert.assertFalse(_helixResourceManager.updateZkMetadata(segmentZKMetadata, 0));
}
/**
@@ -112,16 +102,16 @@ public class PinotResourceManagerTest {
// Basic add/delete case
for (int i = 1; i <= 2; i++) {
- _pinotHelixResourceManager.addNewSegment(SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME), "downloadUrl");
+ _helixResourceManager.addNewSegment(SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME), "downloadUrl");
}
- IdealState idealState = _helixAdmin.getResourceIdealState(HELIX_CLUSTER_NAME, offlineTableName);
+ IdealState idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), offlineTableName);
Set<String> segments = idealState.getPartitionSet();
Assert.assertEquals(segments.size(), 2);
for (String segmentName : segments) {
- _pinotHelixResourceManager.deleteSegment(offlineTableName, segmentName);
+ _helixResourceManager.deleteSegment(offlineTableName, segmentName);
}
- idealState = _helixAdmin.getResourceIdealState(HELIX_CLUSTER_NAME, offlineTableName);
+ idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), offlineTableName);
Assert.assertEquals(idealState.getPartitionSet().size(), 0);
// Concurrent segment deletion
@@ -131,7 +121,7 @@ public class PinotResourceManagerTest {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
- _pinotHelixResourceManager
+ _helixResourceManager
.addNewSegment(SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME), "downloadUrl");
}
}
@@ -140,7 +130,7 @@ public class PinotResourceManagerTest {
addSegmentExecutor.shutdown();
addSegmentExecutor.awaitTermination(1, TimeUnit.MINUTES);
- idealState = _helixAdmin.getResourceIdealState(HELIX_CLUSTER_NAME, offlineTableName);
+ idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), offlineTableName);
Assert.assertEquals(idealState.getPartitionSet().size(), 30);
ExecutorService deleteSegmentExecutor = Executors.newFixedThreadPool(3);
@@ -148,20 +138,20 @@ public class PinotResourceManagerTest {
deleteSegmentExecutor.execute(new Runnable() {
@Override
public void run() {
- _pinotHelixResourceManager.deleteSegment(offlineTableName, segmentName);
+ _helixResourceManager.deleteSegment(offlineTableName, segmentName);
}
});
}
deleteSegmentExecutor.shutdown();
deleteSegmentExecutor.awaitTermination(1, TimeUnit.MINUTES);
- idealState = _helixAdmin.getResourceIdealState(HELIX_CLUSTER_NAME, offlineTableName);
+ idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(), offlineTableName);
Assert.assertEquals(idealState.getPartitionSet().size(), 0);
}
@AfterClass
public void tearDown() {
- _pinotHelixResourceManager.stop();
+ _helixResourceManager.stop();
_zkClient.close();
ZkStarter.stopLocalZkServer(_zookeeperInstance);
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java
index 3879806..87fa60f 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java
@@ -23,7 +23,6 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.I0Itec.zkclient.ZkClient;
-import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.config.ColumnPartitionConfig;
@@ -37,9 +36,7 @@ import org.apache.pinot.common.partition.ReplicaGroupPartitionAssignmentGenerato
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
-import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.helix.core.util.HelixSetupUtils;
-import org.apache.pinot.controller.helix.starter.HelixConfig;
+import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.controller.utils.ReplicaGroupTestUtils;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
import org.testng.Assert;
@@ -48,9 +45,8 @@ import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
-public class SegmentAssignmentStrategyTest {
+public class SegmentAssignmentStrategyTest extends ControllerTest {
private final static String ZK_SERVER = ZkStarter.DEFAULT_ZK_STR;
- private final static String HELIX_CLUSTER_NAME = "TestSegmentAssignmentStrategyHelix";
private final static String TABLE_NAME_BALANCED = "testResourceBalanced";
private final static String TABLE_NAME_RANDOM = "testResourceRandom";
private final static String TABLE_NAME_REPLICA_GROUP_PARTITION_ASSIGNMENT = "testReplicaGroupPartitionAssignment";
@@ -60,10 +56,7 @@ public class SegmentAssignmentStrategyTest {
private static final Random random = new Random();
private final static String PARTITION_COLUMN = "memberId";
private final static int NUM_REPLICA = 2;
- private PinotHelixResourceManager _pinotHelixResourceManager;
private ZkClient _zkClient;
- private HelixManager _helixZkManager;
- private HelixAdmin _helixAdmin;
private final int _numServerInstance = 10;
private final int _numBrokerInstance = 1;
private ZkStarter.ZookeeperInstance _zookeeperInstance;
@@ -74,40 +67,33 @@ public class SegmentAssignmentStrategyTest {
throws Exception {
_zookeeperInstance = ZkStarter.startLocalZkServer();
_zkClient = new ZkClient(ZK_SERVER);
- final String zkPath = "/" + HELIX_CLUSTER_NAME;
+ final String zkPath = "/" + getHelixClusterName();
if (_zkClient.exists(zkPath)) {
_zkClient.deleteRecursive(zkPath);
}
- final String instanceId = "localhost_helixController";
- _pinotHelixResourceManager = new PinotHelixResourceManager(ZK_SERVER, HELIX_CLUSTER_NAME, instanceId, null, 10000L,
- true, /*isUpdateStateModel=*/
- false, false);
- _pinotHelixResourceManager.start();
-
- final String helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(ZK_SERVER);
- _helixZkManager =
- HelixSetupUtils.setup(HELIX_CLUSTER_NAME, helixZkURL, instanceId, /*isUpdateStateModel=*/false, true);
- _helixAdmin = _helixZkManager.getClusterManagmentTool();
+ startController();
+ HelixManager helixZkManager = _helixResourceManager.getHelixZkManager();
+
_partitionAssignmentGenerator =
- new ReplicaGroupPartitionAssignmentGenerator(_helixZkManager.getHelixPropertyStore());
+ new ReplicaGroupPartitionAssignmentGenerator(helixZkManager.getHelixPropertyStore());
ControllerRequestBuilderUtil
- .addFakeDataInstancesToAutoJoinHelixCluster(HELIX_CLUSTER_NAME, ZK_SERVER, _numServerInstance, true);
+ .addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(), ZK_SERVER, _numServerInstance, true);
ControllerRequestBuilderUtil
- .addFakeBrokerInstancesToAutoJoinHelixCluster(HELIX_CLUSTER_NAME, ZK_SERVER, _numBrokerInstance, true);
+ .addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(), ZK_SERVER, _numBrokerInstance, true);
Thread.sleep(100);
- Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_OFFLINE").size(),
+ Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_OFFLINE").size(),
_numServerInstance);
- Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_REALTIME").size(),
+ Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_REALTIME").size(),
_numServerInstance);
- Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_BROKER").size(),
+ Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), "DefaultTenant_BROKER").size(),
_numBrokerInstance);
}
@AfterTest
public void tearDown() {
- _pinotHelixResourceManager.stop();
+ _helixResourceManager.stop();
_zkClient.close();
ZkStarter.stopLocalZkServer(_zookeeperInstance);
}
@@ -119,15 +105,15 @@ public class SegmentAssignmentStrategyTest {
TableConfig tableConfig =
new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(TABLE_NAME_RANDOM)
.setSegmentAssignmentStrategy("RandomAssignmentStrategy").setNumReplicas(NUM_REPLICA).build();
- _pinotHelixResourceManager.addTable(tableConfig);
+ _helixResourceManager.addTable(tableConfig);
// Wait for the table addition
- while (!_pinotHelixResourceManager.hasOfflineTable(TABLE_NAME_RANDOM)) {
+ while (!_helixResourceManager.hasOfflineTable(TABLE_NAME_RANDOM)) {
Thread.sleep(100);
}
for (int i = 0; i < 10; ++i) {
- _pinotHelixResourceManager
+ _helixResourceManager
.addNewSegment(SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME_RANDOM), "downloadUrl");
// Wait for all segments appear in the external view
@@ -135,13 +121,13 @@ public class SegmentAssignmentStrategyTest {
Thread.sleep(100);
}
final Set<String> taggedInstances =
- _pinotHelixResourceManager.getAllInstancesForServerTenant("DefaultTenant_OFFLINE");
+ _helixResourceManager.getAllInstancesForServerTenant("DefaultTenant_OFFLINE");
final Map<String, Integer> instanceToNumSegmentsMap = new HashMap<>();
for (final String instance : taggedInstances) {
instanceToNumSegmentsMap.put(instance, 0);
}
IdealState idealState = _helixAdmin
- .getResourceIdealState(HELIX_CLUSTER_NAME, TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME_RANDOM));
+ .getResourceIdealState(getHelixClusterName(), TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME_RANDOM));
Assert.assertEquals(idealState.getPartitionSet().size(), i + 1);
for (final String segmentId : idealState.getPartitionSet()) {
Assert.assertEquals(idealState.getInstanceStateMap(segmentId).size(), NUM_REPLICA);
@@ -158,11 +144,11 @@ public class SegmentAssignmentStrategyTest {
TableConfig tableConfig =
new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(TABLE_NAME_BALANCED)
.setSegmentAssignmentStrategy("BalanceNumSegmentAssignmentStrategy").setNumReplicas(numReplicas).build();
- _pinotHelixResourceManager.addTable(tableConfig);
+ _helixResourceManager.addTable(tableConfig);
int numSegments = 20;
for (int i = 0; i < numSegments; ++i) {
- _pinotHelixResourceManager
+ _helixResourceManager
.addNewSegment(SegmentMetadataMockUtils.mockSegmentMetadata(TABLE_NAME_BALANCED), "downloadUrl");
}
@@ -172,13 +158,13 @@ public class SegmentAssignmentStrategyTest {
}
final Set<String> taggedInstances =
- _pinotHelixResourceManager.getAllInstancesForServerTenant("DefaultTenant_OFFLINE");
+ _helixResourceManager.getAllInstancesForServerTenant("DefaultTenant_OFFLINE");
final Map<String, Integer> instance2NumSegmentsMap = new HashMap<>();
for (final String instance : taggedInstances) {
instance2NumSegmentsMap.put(instance, 0);
}
final IdealState idealState = _helixAdmin
- .getResourceIdealState(HELIX_CLUSTER_NAME, TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME_BALANCED));
+ .getResourceIdealState(getHelixClusterName(), TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME_BALANCED));
for (final String segmentId : idealState.getPartitionSet()) {
for (final String instance : idealState.getInstanceStateMap(segmentId).keySet()) {
instance2NumSegmentsMap.put(instance, instance2NumSegmentsMap.get(instance) + 1);
@@ -196,7 +182,7 @@ public class SegmentAssignmentStrategyTest {
Assert.assertTrue(instance2NumSegmentsMap.get(instance) <= maxNumSegmentsPerInstance,
"expected <=" + maxNumSegmentsPerInstance + " actual:" + instance2NumSegmentsMap.get(instance));
}
- _helixAdmin.dropResource(HELIX_CLUSTER_NAME, TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME_BALANCED));
+ _helixAdmin.dropResource(getHelixClusterName(), TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME_BALANCED));
}
@Test
@@ -209,7 +195,7 @@ public class SegmentAssignmentStrategyTest {
TableConfig tableConfig = new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE)
.setTableName(TABLE_NAME_REPLICA_GROUP_PARTITION_ASSIGNMENT)
.setSegmentAssignmentStrategy("RandomAssignmentStrategy").setNumReplicas(NUM_REPLICA).build();
- _pinotHelixResourceManager.addTable(tableConfig);
+ _helixResourceManager.addTable(tableConfig);
// Check that partition assignment does not exist
ReplicaGroupPartitionAssignment partitionAssignment =
@@ -230,23 +216,23 @@ public class SegmentAssignmentStrategyTest {
replicaGroupTableConfig.getValidationConfig().setReplicaGroupStrategyConfig(replicaGroupStrategyConfig);
// Check that the replica group partition assignment is created
- _pinotHelixResourceManager
+ _helixResourceManager
.setExistingTableConfig(replicaGroupTableConfig, tableNameWithType, CommonConstants.Helix.TableType.OFFLINE);
partitionAssignment = _partitionAssignmentGenerator.getReplicaGroupPartitionAssignment(tableNameWithType);
Assert.assertTrue(partitionAssignment != null);
// After table deletion, check that the replica group partition assignment is deleted
- _pinotHelixResourceManager.deleteOfflineTable(rawTableName);
+ _helixResourceManager.deleteOfflineTable(rawTableName);
partitionAssignment = _partitionAssignmentGenerator.getReplicaGroupPartitionAssignment(tableNameWithType);
Assert.assertTrue(partitionAssignment == null);
// Create a table with replica group
- _pinotHelixResourceManager.addTable(replicaGroupTableConfig);
+ _helixResourceManager.addTable(replicaGroupTableConfig);
partitionAssignment = _partitionAssignmentGenerator.getReplicaGroupPartitionAssignment(tableNameWithType);
Assert.assertTrue(partitionAssignment != null);
// Check that the replica group partition assignment is deleted
- _pinotHelixResourceManager.deleteOfflineTable(rawTableName);
+ _helixResourceManager.deleteOfflineTable(rawTableName);
partitionAssignment = _partitionAssignmentGenerator.getReplicaGroupPartitionAssignment(tableNameWithType);
Assert.assertTrue(partitionAssignment == null);
}
@@ -268,17 +254,17 @@ public class SegmentAssignmentStrategyTest {
tableConfig.getValidationConfig().setReplicaGroupStrategyConfig(replicaGroupStrategyConfig);
// Create the table
- _pinotHelixResourceManager.addTable(tableConfig);
+ _helixResourceManager.addTable(tableConfig);
// Wait for table addition
- while (!_pinotHelixResourceManager.hasOfflineTable(TABLE_NAME_TABLE_LEVEL_REPLICA_GROUP)) {
+ while (!_helixResourceManager.hasOfflineTable(TABLE_NAME_TABLE_LEVEL_REPLICA_GROUP)) {
Thread.sleep(100);
}
// Upload segments
Map<Integer, Set<String>> segmentsPerPartition = ReplicaGroupTestUtils
.uploadMultipleSegmentsWithPartitionNumber(TABLE_NAME_TABLE_LEVEL_REPLICA_GROUP, 20, null,
- _pinotHelixResourceManager, 1);
+ _helixResourceManager, 1);
// Wait for all segments appear in the external view
while (!allSegmentsPushedToIdealState(TABLE_NAME_TABLE_LEVEL_REPLICA_GROUP, 20)) {
@@ -290,7 +276,7 @@ public class SegmentAssignmentStrategyTest {
ReplicaGroupPartitionAssignment partitionAssignment =
_partitionAssignmentGenerator.getReplicaGroupPartitionAssignment(offlineTableName);
- IdealState idealState = _helixAdmin.getResourceIdealState(HELIX_CLUSTER_NAME,
+ IdealState idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(),
TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME_TABLE_LEVEL_REPLICA_GROUP));
// Validate the segment assignment
@@ -328,17 +314,17 @@ public class SegmentAssignmentStrategyTest {
tableConfig.setIndexingConfig(indexingConfig);
// Add table
- _pinotHelixResourceManager.addTable(tableConfig);
+ _helixResourceManager.addTable(tableConfig);
// Wait for table addition
- while (!_pinotHelixResourceManager.hasOfflineTable(TABLE_NAME_PARTITION_LEVEL_REPLICA_GROUP)) {
+ while (!_helixResourceManager.hasOfflineTable(TABLE_NAME_PARTITION_LEVEL_REPLICA_GROUP)) {
Thread.sleep(100);
}
// Upload segments
Map<Integer, Set<String>> segmentsPerPartition = ReplicaGroupTestUtils
.uploadMultipleSegmentsWithPartitionNumber(TABLE_NAME_PARTITION_LEVEL_REPLICA_GROUP, numSegments,
- PARTITION_COLUMN, _pinotHelixResourceManager, totalPartitionNumber);
+ PARTITION_COLUMN, _helixResourceManager, totalPartitionNumber);
// Wait for all segments appear in the external view
while (!allSegmentsPushedToIdealState(TABLE_NAME_PARTITION_LEVEL_REPLICA_GROUP, numSegments)) {
@@ -350,7 +336,7 @@ public class SegmentAssignmentStrategyTest {
ReplicaGroupPartitionAssignment partitionAssignment =
_partitionAssignmentGenerator.getReplicaGroupPartitionAssignment(offlineTable);
- IdealState idealState = _helixAdmin.getResourceIdealState(HELIX_CLUSTER_NAME,
+ IdealState idealState = _helixAdmin.getResourceIdealState(getHelixClusterName(),
TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME_PARTITION_LEVEL_REPLICA_GROUP));
// Validate the segment assignment
@@ -361,7 +347,7 @@ public class SegmentAssignmentStrategyTest {
private boolean allSegmentsPushedToIdealState(String tableName, int segmentNum) {
IdealState idealState =
- _helixAdmin.getResourceIdealState(HELIX_CLUSTER_NAME, TableNameBuilder.OFFLINE.tableNameWithType(tableName));
+ _helixAdmin.getResourceIdealState(getHelixClusterName(), TableNameBuilder.OFFLINE.tableNameWithType(tableName));
return idealState != null && idealState.getPartitionSet() != null
&& idealState.getPartitionSet().size() == segmentNum;
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
index 27b124f..e039d73 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
@@ -36,8 +36,7 @@ import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
-import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
-import org.apache.pinot.controller.helix.core.util.HelixSetupUtils;
+import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
import org.joda.time.DateTime;
import org.joda.time.Duration;
@@ -54,18 +53,14 @@ import static org.testng.Assert.assertEquals;
/**
* Tests for the ValidationManagers.
*/
-public class ValidationManagerTest {
- private String HELIX_CLUSTER_NAME = "TestValidationManager";
-
+public class ValidationManagerTest extends ControllerTest {
private static final String ZK_STR = ZkStarter.DEFAULT_ZK_STR;
- private static final String CONTROLLER_INSTANCE_NAME = "localhost_11984";
private static final String TEST_TABLE_NAME = "testTable";
private static final String TEST_TABLE_TWO = "testTable2";
private static final String TEST_SEGMENT_NAME = "testSegment";
private ZkClient _zkClient;
- private PinotHelixResourceManager _pinotHelixResourceManager;
private ZkStarter.ZookeeperInstance _zookeeperInstance;
private TableConfig _offlineTableConfig;
private HelixManager _helixManager;
@@ -77,22 +72,16 @@ public class ValidationManagerTest {
_zkClient = new ZkClient(ZK_STR);
Thread.sleep(1000);
- _pinotHelixResourceManager =
- new PinotHelixResourceManager(ZK_STR, HELIX_CLUSTER_NAME, CONTROLLER_INSTANCE_NAME, null, 1000L,
- true, /*isUpdateStateModel=*/
- false, false);
- _pinotHelixResourceManager.start();
+ startController();
- ControllerRequestBuilderUtil.addFakeDataInstancesToAutoJoinHelixCluster(HELIX_CLUSTER_NAME, ZK_STR, 2, true);
- ControllerRequestBuilderUtil.addFakeBrokerInstancesToAutoJoinHelixCluster(HELIX_CLUSTER_NAME, ZK_STR, 2, true);
+ ControllerRequestBuilderUtil.addFakeDataInstancesToAutoJoinHelixCluster(getHelixClusterName(), ZK_STR, 2, true);
+ ControllerRequestBuilderUtil.addFakeBrokerInstancesToAutoJoinHelixCluster(getHelixClusterName(), ZK_STR, 2, true);
_offlineTableConfig =
new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(TEST_TABLE_NAME).setNumReplicas(2)
.build();
-
- final String instanceId = "localhost_helixController";
- _helixManager = HelixSetupUtils.setup(HELIX_CLUSTER_NAME, ZK_STR, instanceId, /*isUpdateStateModel=*/false, true);
- _pinotHelixResourceManager.addTable(_offlineTableConfig);
+ _helixManager = _helixResourceManager.getHelixZkManager();
+ _helixResourceManager.addTable(_offlineTableConfig);
}
@Test
@@ -102,16 +91,16 @@ public class ValidationManagerTest {
String partitionName = _offlineTableConfig.getTableName();
HelixAdmin helixAdmin = _helixManager.getClusterManagmentTool();
- IdealState idealState = HelixHelper.getBrokerIdealStates(helixAdmin, HELIX_CLUSTER_NAME);
+ IdealState idealState = HelixHelper.getBrokerIdealStates(helixAdmin, getHelixClusterName());
// Ensure that the broker resource is not rebuilt.
Assert.assertTrue(idealState.getInstanceSet(partitionName)
- .equals(_pinotHelixResourceManager.getAllInstancesForBrokerTenant(TagNameUtils.DEFAULT_TENANT_NAME)));
- _pinotHelixResourceManager.rebuildBrokerResourceFromHelixTags(partitionName);
+ .equals(_helixResourceManager.getAllInstancesForBrokerTenant(TagNameUtils.DEFAULT_TENANT_NAME)));
+ _helixResourceManager.rebuildBrokerResourceFromHelixTags(partitionName);
// Add another table that needs to be rebuilt
TableConfig offlineTableConfigTwo =
new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(TEST_TABLE_TWO).build();
- _pinotHelixResourceManager.addTable(offlineTableConfigTwo);
+ _helixResourceManager.addTable(offlineTableConfigTwo);
String partitionNameTwo = offlineTableConfigTwo.getTableName();
// Add a new broker manually such that the ideal state is not updated and ensure that rebuild broker resource is called
@@ -120,18 +109,18 @@ public class ValidationManagerTest {
instanceConfig.setInstanceEnabled(true);
instanceConfig.setHostName("Broker_localhost");
instanceConfig.setPort("2");
- helixAdmin.addInstance(HELIX_CLUSTER_NAME, instanceConfig);
- helixAdmin.addInstanceTag(HELIX_CLUSTER_NAME, instanceConfig.getInstanceName(),
+ helixAdmin.addInstance(getHelixClusterName(), instanceConfig);
+ helixAdmin.addInstanceTag(getHelixClusterName(), instanceConfig.getInstanceName(),
TagNameUtils.getBrokerTagForTenant(TagNameUtils.DEFAULT_TENANT_NAME));
- idealState = HelixHelper.getBrokerIdealStates(helixAdmin, HELIX_CLUSTER_NAME);
+ idealState = HelixHelper.getBrokerIdealStates(helixAdmin, getHelixClusterName());
// Assert that the two don't equal before the call to rebuild the broker resource.
Assert.assertTrue(!idealState.getInstanceSet(partitionNameTwo)
- .equals(_pinotHelixResourceManager.getAllInstancesForBrokerTenant(TagNameUtils.DEFAULT_TENANT_NAME)));
- _pinotHelixResourceManager.rebuildBrokerResourceFromHelixTags(partitionNameTwo);
- idealState = HelixHelper.getBrokerIdealStates(helixAdmin, HELIX_CLUSTER_NAME);
+ .equals(_helixResourceManager.getAllInstancesForBrokerTenant(TagNameUtils.DEFAULT_TENANT_NAME)));
+ _helixResourceManager.rebuildBrokerResourceFromHelixTags(partitionNameTwo);
+ idealState = HelixHelper.getBrokerIdealStates(helixAdmin, getHelixClusterName());
// Assert that the two do equal after being rebuilt.
Assert.assertTrue(idealState.getInstanceSet(partitionNameTwo)
- .equals(_pinotHelixResourceManager.getAllInstancesForBrokerTenant(TagNameUtils.DEFAULT_TENANT_NAME)));
+ .equals(_helixResourceManager.getAllInstancesForBrokerTenant(TagNameUtils.DEFAULT_TENANT_NAME)));
}
@Test
@@ -139,9 +128,9 @@ public class ValidationManagerTest {
throws Exception {
SegmentMetadata segmentMetadata = SegmentMetadataMockUtils.mockSegmentMetadata(TEST_TABLE_NAME, TEST_SEGMENT_NAME);
- _pinotHelixResourceManager.addNewSegment(segmentMetadata, "http://dummy/");
+ _helixResourceManager.addNewSegment(segmentMetadata, "http://dummy/");
OfflineSegmentZKMetadata offlineSegmentZKMetadata =
- _pinotHelixResourceManager.getOfflineSegmentZKMetadata(TEST_TABLE_NAME, TEST_SEGMENT_NAME);
+ _helixResourceManager.getOfflineSegmentZKMetadata(TEST_TABLE_NAME, TEST_SEGMENT_NAME);
long pushTime = offlineSegmentZKMetadata.getPushTime();
// Check that the segment has been pushed in the last 30 seconds
Assert.assertTrue(System.currentTimeMillis() - pushTime < 30_000);
@@ -150,10 +139,10 @@ public class ValidationManagerTest {
// Refresh the segment
Mockito.when(segmentMetadata.getCrc()).thenReturn(Long.toString(System.nanoTime()));
- _pinotHelixResourceManager.refreshSegment(segmentMetadata, offlineSegmentZKMetadata);
+ _helixResourceManager.refreshSegment(segmentMetadata, offlineSegmentZKMetadata);
offlineSegmentZKMetadata =
- _pinotHelixResourceManager.getOfflineSegmentZKMetadata(TEST_TABLE_NAME, TEST_SEGMENT_NAME);
+ _helixResourceManager.getOfflineSegmentZKMetadata(TEST_TABLE_NAME, TEST_SEGMENT_NAME);
// Check that the segment still has the same push time
assertEquals(offlineSegmentZKMetadata.getPushTime(), pushTime);
// Check that the refresh time is in the last 30 seconds
@@ -199,7 +188,7 @@ public class ValidationManagerTest {
@AfterClass
public void shutDown() {
- _pinotHelixResourceManager.stop();
+ _helixResourceManager.stop();
_zkClient.close();
ZkStarter.stopLocalZkServer(_zookeeperInstance);
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
index 1efeeeb..e8cdb68 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ConvertToRawIndexMinionClusterIntegrationTest.java
@@ -171,8 +171,8 @@ public class ConvertToRawIndexMinionClusterIntegrationTest extends HybridCluster
@Test
public void testPinotHelixResourceManagerAPIs() {
// Instance APIs
- Assert.assertEquals(_helixResourceManager.getAllInstances().size(), 6);
- Assert.assertEquals(_helixResourceManager.getOnlineInstanceList().size(), 6);
+ Assert.assertEquals(_helixResourceManager.getAllInstances().size(), 7);
+ Assert.assertEquals(_helixResourceManager.getOnlineInstanceList().size(), 7);
Assert.assertEquals(_helixResourceManager.getOnlineUnTaggedBrokerInstanceList().size(), 0);
Assert.assertEquals(_helixResourceManager.getOnlineUnTaggedServerInstanceList().size(), 0);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 172b2fa..85c8b15 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -516,7 +516,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
JsonNode response = JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceList()));
JsonNode instanceList = response.get("instances");
int numInstances = instanceList.size();
- assertEquals(numInstances, getNumBrokers() + getNumServers());
+ // The total number of instances is equal to the sum of num brokers, num servers and 1 controller.
+ assertEquals(numInstances, getNumBrokers() + getNumServers() + 1);
// Try to delete a server that does not exist
String deleteInstanceRequest = _controllerRequestURLBuilder.forInstanceDelete("potato");
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartControllerCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartControllerCommand.java
index db56895..45d36a2 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartControllerCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartControllerCommand.java
@@ -51,7 +51,10 @@ public class StartControllerCommand extends AbstractBaseAdminCommand implements
@Option(name = "-clusterName", required = false, metaVar = "<String>", usage = "Pinot cluster name.")
private String _clusterName = DEFAULT_CLUSTER_NAME;
- @Option(name = "-configFileName", required = false, metaVar = "<FilePathName>", usage = "Controller Starter config file", forbids = {"-controllerHost", "-controllerPort", "-dataDir", "-zkAddress", "-clusterName"})
+ @Option(name = "-controllerMode", required = false, metaVar = "<String>", usage = "Pinot controller mode.")
+ private ControllerConf.ControllerMode _controllerMode = ControllerConf.ControllerMode.DUAL;
+
+ @Option(name = "-configFileName", required = false, metaVar = "<FilePathName>", usage = "Controller Starter config file", forbids = {"-controllerHost", "-controllerPort", "-dataDir", "-zkAddress", "-clusterName", "-controllerMode"})
private String _configFileName;
@Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
@@ -150,6 +153,8 @@ public class StartControllerCommand extends AbstractBaseAdminCommand implements
conf.setOfflineSegmentIntervalCheckerFrequencyInSeconds(3600);
conf.setRealtimeSegmentValidationFrequencyInSeconds(3600);
conf.setBrokerResourceValidationFrequencyInSeconds(3600);
+
+ conf.setControllerMode(_controllerMode);
}
LOGGER.info("Executing command: " + toString());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org