You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/04/10 23:19:36 UTC
[incubator-pinot] 01/01: Remove singleton for
PinotLLCRealtimeSegmentManager and SegmentCompletionManager
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch clean-up-singleton
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 4b5df6ccd1ccd4febb19f8601cc78ae3c6fe5130
Author: jackjlli <jl...@linkedin.com>
AuthorDate: Wed Apr 10 16:16:51 2019 -0700
Remove singleton for PinotLLCRealtimeSegmentManager and SegmentCompletionManager
---
.../apache/pinot/controller/ControllerStarter.java | 51 +++--
.../resources/LLCSegmentCompletionHandlers.java | 19 +-
.../helix/core/PinotHelixResourceManager.java | 124 ++++++------
.../helix/core/PinotTableIdealStateBuilder.java | 27 +--
.../realtime/PinotLLCRealtimeSegmentManager.java | 28 +--
.../core/realtime/SegmentCompletionManager.java | 71 +++----
.../controller/helix/PinotControllerModeTest.java | 9 +
.../helix/core/realtime/SegmentCompletionTest.java | 214 +++++++++++----------
8 files changed, 265 insertions(+), 278 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index 10c1c00..0e59158 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -55,6 +55,7 @@ import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTaskScheduler;
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.helix.core.realtime.PinotRealtimeSegmentManager;
+import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategyFactory;
import org.apache.pinot.controller.helix.core.relocation.RealtimeSegmentRelocator;
import org.apache.pinot.controller.helix.core.retention.RetentionManager;
@@ -107,6 +108,8 @@ public class ControllerStarter {
private ControllerPeriodicTaskScheduler _controllerPeriodicTaskScheduler;
private PinotHelixTaskResourceManager _helixTaskResourceManager;
private PinotRealtimeSegmentManager _realtimeSegmentsManager;
+ private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
+ private SegmentCompletionManager _segmentCompletionManager;
private ControllerLeadershipManager _controllerLeadershipManager;
private List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbackList;
@@ -191,15 +194,16 @@ public class ControllerStarter {
LOGGER.error("Invalid mode: " + _controllerMode);
}
- ServiceStatus.setServiceStatusCallback(new ServiceStatus.MultipleCallbackServiceStatusCallback(_serviceStatusCallbackList));
+ ServiceStatus.setServiceStatusCallback(
+ new ServiceStatus.MultipleCallbackServiceStatusCallback(_serviceStatusCallbackList));
_controllerMetrics.initializeGlobalMeters();
}
private void setUpHelixController() {
// Register and connect instance as Helix controller.
LOGGER.info("Starting Helix controller");
- _helixControllerManager = HelixSetupUtils
- .setup(_helixClusterName, _helixZkURL, _instanceId, _isUpdateStateModel, _enableBatchMessageMode);
+ _helixControllerManager = HelixSetupUtils.setup(_helixClusterName, _helixZkURL, _instanceId, _isUpdateStateModel,
+ _enableBatchMessageMode);
// Emit helix controller metrics
_controllerMetrics.addCallbackGauge("helix.connected", () -> _helixControllerManager.isConnected() ? 1L : 0L);
@@ -241,7 +245,15 @@ public class ControllerStarter {
// Helix resource manager must be started in order to create PinotLLCRealtimeSegmentManager
LOGGER.info("Starting realtime segment manager");
- PinotLLCRealtimeSegmentManager.create(_helixResourceManager, _config, _controllerMetrics, _controllerLeadershipManager);
+
+ _pinotLLCRealtimeSegmentManager =
+ new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config, _controllerMetrics,
+ _controllerLeadershipManager);
+ // TODO: Need to put this inside HelixResourceManager when ControllerLeadershipManager is removed.
+ _helixResourceManager.setPinotLLCRealtimeSegmentManager(_pinotLLCRealtimeSegmentManager);
+ _segmentCompletionManager = new SegmentCompletionManager(helixParticipantManager, _pinotLLCRealtimeSegmentManager, _controllerMetrics, _config.getSegmentCommitTimeoutSeconds(), _controllerLeadershipManager);
+
+
_realtimeSegmentsManager = new PinotRealtimeSegmentManager(_helixResourceManager, _controllerLeadershipManager);
_realtimeSegmentsManager.start(_controllerMetrics);
@@ -279,6 +291,7 @@ public class ControllerStarter {
bind(_config).to(ControllerConf.class);
bind(_helixResourceManager).to(PinotHelixResourceManager.class);
bind(_helixTaskResourceManager).to(PinotHelixTaskResourceManager.class);
+ bind(_segmentCompletionManager).to(SegmentCompletionManager.class);
bind(_taskManager).to(PinotTaskManager.class);
bind(connectionManager).to(HttpConnectionManager.class);
bind(_executorService).to(Executor.class);
@@ -400,7 +413,7 @@ public class ControllerStarter {
_controllerMetrics);
periodicTasks.add(_offlineSegmentIntervalChecker);
_realtimeSegmentValidationManager = new RealtimeSegmentValidationManager(_config, _helixResourceManager,
- PinotLLCRealtimeSegmentManager.getInstance(), new ValidationMetrics(_metricsRegistry), _controllerMetrics);
+ _pinotLLCRealtimeSegmentManager, new ValidationMetrics(_metricsRegistry), _controllerMetrics);
periodicTasks.add(_realtimeSegmentValidationManager);
_brokerResourceValidationManager =
new BrokerResourceValidationManager(_config, _helixResourceManager, _controllerMetrics);
@@ -414,18 +427,18 @@ public class ControllerStarter {
}
public void stop() {
- switch (_controllerMode) {
- case DUAL:
- stopPinotController();
- stopHelixController();
- break;
- case PINOT_ONLY:
- stopPinotController();
- break;
- case HELIX_ONLY:
- stopHelixController();
- break;
- }
+ switch (_controllerMode) {
+ case DUAL:
+ stopPinotController();
+ stopHelixController();
+ break;
+ case PINOT_ONLY:
+ stopPinotController();
+ break;
+ case HELIX_ONLY:
+ stopHelixController();
+ break;
+ }
}
private void stopHelixController() {
@@ -441,7 +454,7 @@ public class ControllerStarter {
// Stop PinotLLCSegmentManager before stopping Jersey API. It is possible that stopping Jersey API
// may interrupt the handlers waiting on an I/O.
- PinotLLCRealtimeSegmentManager.getInstance().stop();
+ _pinotLLCRealtimeSegmentManager.stop();
LOGGER.info("Closing PinotFS classes");
PinotFSFactory.shutdown();
@@ -465,7 +478,7 @@ public class ControllerStarter {
}
public boolean isPinotOnlyModeSupported() {
- return false;
+ return true;
}
public MetricsRegistry getMetricsRegistry() {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
index 19e2123..bbd70b3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
@@ -75,6 +75,9 @@ public class LLCSegmentCompletionHandlers {
@Inject
ControllerConf _controllerConf;
+ @Inject
+ SegmentCompletionManager _segmentCompletionManager;
+
@VisibleForTesting
public static String getScheme() {
return SCHEME;
@@ -104,7 +107,7 @@ public class LLCSegmentCompletionHandlers {
.withExtraTimeSec(extraTimeSec);
LOGGER.info("Processing extendBuildTime:{}", requestParams.toString());
- SegmentCompletionProtocol.Response response = SegmentCompletionManager.getInstance().extendBuildTime(requestParams);
+ SegmentCompletionProtocol.Response response = _segmentCompletionManager.extendBuildTime(requestParams);
final String responseStr = response.toJsonString();
LOGGER.info("Response to extendBuildTime:{}", responseStr);
@@ -130,7 +133,7 @@ public class LLCSegmentCompletionHandlers {
.withMemoryUsedBytes(memoryUsedBytes).withNumRows(numRows);
LOGGER.info("Processing segmentConsumed:{}", requestParams.toString());
- SegmentCompletionProtocol.Response response = SegmentCompletionManager.getInstance().segmentConsumed(requestParams);
+ SegmentCompletionProtocol.Response response = _segmentCompletionManager.segmentConsumed(requestParams);
final String responseStr = response.toJsonString();
LOGGER.info("Response to segmentConsumed:{}", responseStr);
return responseStr;
@@ -153,7 +156,7 @@ public class LLCSegmentCompletionHandlers {
LOGGER.info("Processing segmentStoppedConsuming:{}", requestParams.toString());
SegmentCompletionProtocol.Response response =
- SegmentCompletionManager.getInstance().segmentStoppedConsuming(requestParams);
+ _segmentCompletionManager.segmentStoppedConsuming(requestParams);
final String responseStr = response.toJsonString();
LOGGER.info("Response to segmentStoppedConsuming:{}", responseStr);
return responseStr;
@@ -183,7 +186,7 @@ public class LLCSegmentCompletionHandlers {
LOGGER.info("Processing segmentCommitStart:{}", requestParams.toString());
SegmentCompletionProtocol.Response response =
- SegmentCompletionManager.getInstance().segmentCommitStart(requestParams);
+ _segmentCompletionManager.segmentCommitStart(requestParams);
final String responseStr = response.toJsonString();
LOGGER.info("Response to segmentCommitStart:{}", responseStr);
return responseStr;
@@ -230,7 +233,7 @@ public class LLCSegmentCompletionHandlers {
CommittingSegmentDescriptor committingSegmentDescriptor =
CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams, segmentMetadata);
- SegmentCompletionProtocol.Response response = SegmentCompletionManager.getInstance()
+ SegmentCompletionProtocol.Response response = _segmentCompletionManager
.segmentCommitEnd(requestParams, isSuccess, isSplitCommit, committingSegmentDescriptor);
final String responseStr = response.toJsonString();
LOGGER.info("Response to segmentCommitEnd:{}", responseStr);
@@ -255,7 +258,7 @@ public class LLCSegmentCompletionHandlers {
.withNumRows(numRows).withMemoryUsedBytes(memoryUsedBytes);
LOGGER.info("Processing segmentCommit:{}", requestParams.toString());
- final SegmentCompletionManager segmentCompletionManager = SegmentCompletionManager.getInstance();
+ final SegmentCompletionManager segmentCompletionManager = _segmentCompletionManager;
SegmentCompletionProtocol.Response response = segmentCompletionManager.segmentCommitStart(requestParams);
CommittingSegmentDescriptor committingSegmentDescriptor =
@@ -297,7 +300,7 @@ public class LLCSegmentCompletionHandlers {
// check for existing segment file and remove it. So, the block cannot be removed altogether.
// For now, we live with these corner cases. Once we have split-commit enabled and working, this code will no longer
// be used.
- synchronized (SegmentCompletionManager.getInstance()) {
+ synchronized (_segmentCompletionManager) {
if (pinotFS.exists(segmentFileURI)) {
LOGGER.warn("Segment file {} exists. Replacing with upload from {} for segment {}",
segmentFileURI.toString(), instanceId, segmentName);
@@ -406,7 +409,7 @@ public class LLCSegmentCompletionHandlers {
LOGGER.error("Segment metadata extraction failure for segment {}", segmentName);
return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
}
- SegmentCompletionProtocol.Response response = SegmentCompletionManager.getInstance()
+ SegmentCompletionProtocol.Response response = _segmentCompletionManager
.segmentCommitEnd(requestParams, isSuccess, isSplitCommit,
CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams, segmentMetadata));
final String responseStr = response.toJsonString();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 1ad19fc..4216554 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -137,6 +137,7 @@ public class PinotHelixResourceManager {
private ZkCacheBaseDataAccessor<ZNRecord> _cacheInstanceConfigsDataAccessor;
private Builder _keyBuilder;
private SegmentDeletionManager _segmentDeletionManager;
+ private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
private TableRebalancer _tableRebalancer;
public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String helixClusterName,
@@ -253,9 +254,8 @@ public class PinotHelixResourceManager {
* Register and connect to Helix cluster as PARTICIPANT role.
*/
private HelixManager registerAndConnectAsHelixParticipant() {
- HelixManager helixManager = HelixManagerFactory
- .getZKHelixManager(_helixClusterName, CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + _instanceId,
- InstanceType.PARTICIPANT, _helixZkURL);
+ HelixManager helixManager = HelixManagerFactory.getZKHelixManager(_helixClusterName,
+ CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + _instanceId, InstanceType.PARTICIPANT, _helixZkURL);
try {
helixManager.connect();
return helixManager;
@@ -525,8 +525,8 @@ public class PinotHelixResourceManager {
// Segment is in error and we don't consider error state as different from target, therefore continue
} else {
// Will try to read data every 500 ms, only if external view not updated.
- Uninterruptibles
- .sleepUninterruptibly(DEFAULT_EXTERNAL_VIEW_UPDATE_RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
+ Uninterruptibles.sleepUninterruptibly(DEFAULT_EXTERNAL_VIEW_UPDATE_RETRY_INTERVAL_MILLIS,
+ TimeUnit.MILLISECONDS);
continue deadlineLoop;
}
}
@@ -536,8 +536,8 @@ public class PinotHelixResourceManager {
return true;
} else {
// Segment doesn't exist in EV, wait for a little bit
- Uninterruptibles
- .sleepUninterruptibly(DEFAULT_EXTERNAL_VIEW_UPDATE_RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
+ Uninterruptibles.sleepUninterruptibly(DEFAULT_EXTERNAL_VIEW_UPDATE_RETRY_INTERVAL_MILLIS,
+ TimeUnit.MILLISECONDS);
}
}
@@ -579,8 +579,7 @@ public class PinotHelixResourceManager {
return PinotResourceManagerResponse.SUCCESS;
}
- public PinotResourceManagerResponse rebuildBrokerResourceFromHelixTags(String tableNameWithType)
- throws Exception {
+ public PinotResourceManagerResponse rebuildBrokerResourceFromHelixTags(String tableNameWithType) throws Exception {
TableConfig tableConfig;
try {
tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
@@ -602,8 +601,8 @@ public class PinotHelixResourceManager {
IdealState brokerIdealState = HelixHelper.getBrokerIdealStates(_helixAdmin, _helixClusterName);
Set<String> brokerInstancesInIdealState = brokerIdealState.getInstanceSet(tableNameWithType);
if (brokerInstancesInIdealState.equals(brokerInstances)) {
- return PinotResourceManagerResponse
- .success("Broker resource is not rebuilt because ideal state is the same for table: " + tableNameWithType);
+ return PinotResourceManagerResponse.success(
+ "Broker resource is not rebuilt because ideal state is the same for table: " + tableNameWithType);
}
// Update ideal state with the new broker instances
@@ -639,8 +638,8 @@ public class PinotHelixResourceManager {
tableIdealState.setPartitionState(tableNameWithType, instanceName, BrokerOnlineOfflineStateModel.ONLINE);
}
}
- _helixAdmin
- .setResourceIdealState(_helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, tableIdealState);
+ _helixAdmin.setResourceIdealState(_helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE,
+ tableIdealState);
}
private PinotResourceManagerResponse scaleDownBroker(Tenant tenant, String brokerTenantTag,
@@ -788,8 +787,8 @@ public class PinotHelixResourceManager {
public boolean isServerTenantDeletable(String tenantName) {
Set<String> taggedInstances = new HashSet<>(
HelixHelper.getInstancesWithTag(_helixZkManager, TagNameUtils.getOfflineTagForTenant(tenantName)));
- taggedInstances
- .addAll(HelixHelper.getInstancesWithTag(_helixZkManager, TagNameUtils.getRealtimeTagForTenant(tenantName)));
+ taggedInstances.addAll(
+ HelixHelper.getInstancesWithTag(_helixZkManager, TagNameUtils.getRealtimeTagForTenant(tenantName)));
for (String resourceName : getAllResources()) {
if (!TableNameBuilder.isTableResource(resourceName)) {
continue;
@@ -812,9 +811,8 @@ public class PinotHelixResourceManager {
for (String instanceName : instancesInCluster) {
InstanceConfig config = _helixDataAccessor.getProperty(_keyBuilder.instanceConfig(instanceName));
for (String tag : config.getTags()) {
- if (tag.equals(CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE) || tag
- .equals(CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE) || tag
- .equals(CommonConstants.Minion.UNTAGGED_INSTANCE)) {
+ if (tag.equals(CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE) || tag.equals(
+ CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE) || tag.equals(CommonConstants.Minion.UNTAGGED_INSTANCE)) {
continue;
}
TenantRole tenantRole;
@@ -838,9 +836,8 @@ public class PinotHelixResourceManager {
for (String instanceName : instancesInCluster) {
InstanceConfig config = _helixDataAccessor.getProperty(_keyBuilder.instanceConfig(instanceName));
for (String tag : config.getTags()) {
- if (tag.equals(CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE) || tag
- .equals(CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE) || tag
- .equals(CommonConstants.Minion.UNTAGGED_INSTANCE)) {
+ if (tag.equals(CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE) || tag.equals(
+ CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE) || tag.equals(CommonConstants.Minion.UNTAGGED_INSTANCE)) {
continue;
}
TenantRole tenantRole;
@@ -1027,9 +1024,8 @@ public class PinotHelixResourceManager {
}
public List<String> getSchemaNames() {
- return _propertyStore
- .getChildNames(PinotHelixPropertyStoreZnRecordProvider.forSchema(_propertyStore).getRelativePath(),
- AccessOption.PERSISTENT);
+ return _propertyStore.getChildNames(
+ PinotHelixPropertyStoreZnRecordProvider.forSchema(_propertyStore).getRelativePath(), AccessOption.PERSISTENT);
}
/**
@@ -1037,8 +1033,7 @@ public class PinotHelixResourceManager {
* @throws InvalidTableConfigException
* @throws TableAlreadyExistsException for offline tables only if the table already exists
*/
- public void addTable(@Nonnull TableConfig tableConfig)
- throws IOException {
+ public void addTable(@Nonnull TableConfig tableConfig) throws IOException {
final String tableNameWithType = tableConfig.getTableName();
TenantConfig tenantConfig;
@@ -1107,9 +1102,8 @@ public class PinotHelixResourceManager {
}
// now lets build an ideal state
LOGGER.info("building empty ideal state for table : " + tableNameWithType);
- final IdealState offlineIdealState = PinotTableIdealStateBuilder
- .buildEmptyIdealStateFor(tableNameWithType, Integer.parseInt(segmentsConfig.getReplication()),
- _enableBatchMessageMode);
+ final IdealState offlineIdealState = PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType,
+ Integer.parseInt(segmentsConfig.getReplication()), _enableBatchMessageMode);
LOGGER.info("adding table via the admin");
_helixAdmin.addResource(_helixClusterName, tableNameWithType, offlineIdealState);
LOGGER.info("successfully added the table : " + tableNameWithType + " to the cluster");
@@ -1215,6 +1209,10 @@ public class PinotHelixResourceManager {
}
}
+ public void setPinotLLCRealtimeSegmentManager(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager) {
+ _pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager;
+ }
+
private void ensureRealtimeClusterIsSetUp(TableConfig config, String realtimeTableName,
IndexingConfig indexingConfig) {
StreamConfig streamConfig = new StreamConfig(indexingConfig.getStreamConfigs());
@@ -1234,7 +1232,7 @@ public class PinotHelixResourceManager {
// Only high-level consumer specified in the config.
createHelixEntriesForHighLevelConsumer(config, realtimeTableName, idealState);
// Clean up any LLC table if they are present
- PinotLLCRealtimeSegmentManager.getInstance().cleanupLLC(realtimeTableName);
+ _pinotLLCRealtimeSegmentManager.cleanupLLC(realtimeTableName);
}
}
@@ -1243,8 +1241,8 @@ public class PinotHelixResourceManager {
// Will either create idealstate entry, or update the IS entry with new segments
// (unless there are low-level segments already present)
if (ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, realtimeTableName).isEmpty()) {
- PinotTableIdealStateBuilder
- .buildLowLevelRealtimeIdealStateFor(realtimeTableName, config, idealState, _enableBatchMessageMode);
+ PinotTableIdealStateBuilder.buildLowLevelRealtimeIdealStateFor(_pinotLLCRealtimeSegmentManager,
+ realtimeTableName, config, idealState, _enableBatchMessageMode);
LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName);
} else {
LOGGER.info("LLC is already set up for table {}, not configuring again", realtimeTableName);
@@ -1255,9 +1253,8 @@ public class PinotHelixResourceManager {
private void createHelixEntriesForHighLevelConsumer(TableConfig config, String realtimeTableName,
IdealState idealState) {
if (idealState == null) {
- idealState = PinotTableIdealStateBuilder
- .buildInitialHighLevelRealtimeIdealStateFor(realtimeTableName, config, _helixZkManager, _propertyStore,
- _enableBatchMessageMode);
+ idealState = PinotTableIdealStateBuilder.buildInitialHighLevelRealtimeIdealStateFor(realtimeTableName, config,
+ _helixZkManager, _propertyStore, _enableBatchMessageMode);
LOGGER.info("Adding helix resource with empty HLC IdealState for {}", realtimeTableName);
_helixAdmin.addResource(_helixClusterName, realtimeTableName, idealState);
} else {
@@ -1272,8 +1269,7 @@ public class PinotHelixResourceManager {
}
}
- public void setExistingTableConfig(TableConfig config, String tableNameWithType, TableType type)
- throws IOException {
+ public void setExistingTableConfig(TableConfig config, String tableNameWithType, TableType type) throws IOException {
if (type == TableType.REALTIME) {
ZKMetadataProvider.setRealtimeTableConfig(_propertyStore, tableNameWithType, config.toZNRecord());
ensureRealtimeClusterIsSetUp(config, tableNameWithType, config.getIndexingConfig());
@@ -1284,7 +1280,8 @@ public class PinotHelixResourceManager {
ZKMetadataProvider.setOfflineTableConfig(_propertyStore, tableNameWithType, config.toZNRecord());
IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
final String configReplication = config.getValidationConfig().getReplication();
- if (configReplication != null && !config.getValidationConfig().getReplication()
+ if (configReplication != null && !config.getValidationConfig()
+ .getReplication()
.equals(idealState.getReplicas())) {
HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, new Function<IdealState, IdealState>() {
@Nullable
@@ -1298,8 +1295,7 @@ public class PinotHelixResourceManager {
}
}
- public void updateMetadataConfigFor(String tableName, TableType type, TableCustomConfig newConfigs)
- throws Exception {
+ public void updateMetadataConfigFor(String tableName, TableType type, TableCustomConfig newConfigs) throws Exception {
String tableNameWithType = TableNameBuilder.forType(type).tableNameWithType(tableName);
TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
if (tableConfig == null) {
@@ -1310,8 +1306,7 @@ public class PinotHelixResourceManager {
}
public void updateSegmentsValidationAndRetentionConfigFor(String tableName, TableType type,
- SegmentsValidationAndRetentionConfig newConfigs)
- throws Exception {
+ SegmentsValidationAndRetentionConfig newConfigs) throws Exception {
String tableNameWithType = TableNameBuilder.forType(type).tableNameWithType(tableName);
TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
if (tableConfig == null) {
@@ -1321,8 +1316,7 @@ public class PinotHelixResourceManager {
setExistingTableConfig(tableConfig, tableNameWithType, type);
}
- public void updateIndexingConfigFor(String tableName, TableType type, IndexingConfig newConfigs)
- throws Exception {
+ public void updateIndexingConfigFor(String tableName, TableType type, IndexingConfig newConfigs) throws Exception {
String tableNameWithType = TableNameBuilder.forType(type).tableNameWithType(tableName);
TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
if (tableConfig == null) {
@@ -1449,8 +1443,8 @@ public class PinotHelixResourceManager {
}
}
- return (status) ? PinotResourceManagerResponse
- .success("Table " + tableName + " enabled (reset success = " + resetSuccessful + ")")
+ return (status) ? PinotResourceManagerResponse.success(
+ "Table " + tableName + " enabled (reset success = " + resetSuccessful + ")")
: PinotResourceManagerResponse.success("Table " + tableName + " disabled");
}
@@ -1654,8 +1648,8 @@ public class PinotHelixResourceManager {
if (customConfig != null) {
Map<String, String> customConfigMap = customConfig.getCustomConfigs();
if (customConfigMap != null) {
- if (customConfigMap.containsKey(TableCustomConfig.MESSAGE_BASED_REFRESH_KEY) && !Boolean
- .valueOf(customConfigMap.get(TableCustomConfig.MESSAGE_BASED_REFRESH_KEY))) {
+ if (customConfigMap.containsKey(TableCustomConfig.MESSAGE_BASED_REFRESH_KEY) && !Boolean.valueOf(
+ customConfigMap.get(TableCustomConfig.MESSAGE_BASED_REFRESH_KEY))) {
return false;
}
}
@@ -1727,11 +1721,10 @@ public class PinotHelixResourceManager {
Preconditions.checkNotNull(offlineTableConfig);
int numReplicas = Integer.parseInt(offlineTableConfig.getValidationConfig().getReplication());
String serverTenant = TagNameUtils.getOfflineTagForTenant(offlineTableConfig.getTenantConfig().getServer());
- SegmentAssignmentStrategy segmentAssignmentStrategy = SegmentAssignmentStrategyFactory
- .getSegmentAssignmentStrategy(offlineTableConfig.getValidationConfig().getSegmentAssignmentStrategy());
- return segmentAssignmentStrategy
- .getAssignedInstances(_helixZkManager, _helixAdmin, _propertyStore, _helixClusterName, segmentMetadata,
- numReplicas, serverTenant);
+ SegmentAssignmentStrategy segmentAssignmentStrategy = SegmentAssignmentStrategyFactory.getSegmentAssignmentStrategy(
+ offlineTableConfig.getValidationConfig().getSegmentAssignmentStrategy());
+ return segmentAssignmentStrategy.getAssignedInstances(_helixZkManager, _helixAdmin, _propertyStore,
+ _helixClusterName, segmentMetadata, numReplicas, serverTenant);
}
/**
@@ -1796,9 +1789,9 @@ public class PinotHelixResourceManager {
LOGGER.info("Wait until segment - " + segmentName + " to be OFFLINE in ExternalView");
if (!ifExternalViewChangeReflectedForState(tableName, segmentName, "OFFLINE",
_externalViewOnlineToOfflineTimeoutMillis, false)) {
- LOGGER
- .error("External view for segment {} did not reflect the ideal state of OFFLINE within the {} ms time limit",
- segmentName, _externalViewOnlineToOfflineTimeoutMillis);
+ LOGGER.error(
+ "External view for segment {} did not reflect the ideal state of OFFLINE within the {} ms time limit",
+ segmentName, _externalViewOnlineToOfflineTimeoutMillis);
return false;
}
@@ -1817,8 +1810,8 @@ public class PinotHelixResourceManager {
// Check that the ideal state has been written to ZK
updatedIdealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableName);
instanceStateMap = updatedIdealState.getInstanceStateMap(segmentName);
- LOGGER
- .info("Found {} instances for segment '{}', after updating ideal state", instanceStateMap.size(), segmentName);
+ LOGGER.info("Found {} instances for segment '{}', after updating ideal state", instanceStateMap.size(),
+ segmentName);
for (String state : instanceStateMap.values()) {
if (!"ONLINE".equals(state)) {
LOGGER.error("Failed to write ONLINE ideal state!");
@@ -1966,8 +1959,8 @@ public class PinotHelixResourceManager {
Map<String, String> instanceStateMap = updatedIdealState.getInstanceStateMap(segmentName);
for (String state : instanceStateMap.values()) {
if (!status.equals(state)) {
- return PinotResourceManagerResponse
- .failure("Failed to update ideal state when setting status " + status + " for segment " + segmentName);
+ return PinotResourceManagerResponse.failure(
+ "Failed to update ideal state when setting status " + status + " for segment " + segmentName);
}
}
@@ -1977,8 +1970,8 @@ public class PinotHelixResourceManager {
}
}
- return (externalViewUpdateSuccessful) ? PinotResourceManagerResponse
- .success("Segments " + segments + " now " + status)
+ return (externalViewUpdateSuccessful) ? PinotResourceManagerResponse.success(
+ "Segments " + segments + " now " + status)
: PinotResourceManagerResponse.failure("Timed out. External view not completely updated");
}
@@ -2106,8 +2099,8 @@ public class PinotHelixResourceManager {
IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, resource);
for (String partition : idealState.getPartitionSet()) {
if (idealState.getInstanceSet(partition).contains(instanceName)) {
- return PinotResourceManagerResponse
- .failure("Instance " + instanceName + " exists in ideal state for " + resource);
+ return PinotResourceManagerResponse.failure(
+ "Instance " + instanceName + " exists in ideal state for " + resource);
}
}
}
@@ -2186,8 +2179,7 @@ public class PinotHelixResourceManager {
@Nonnull
public RebalanceResult rebalanceTable(final String rawTableName, TableType tableType,
- Configuration rebalanceUserConfig)
- throws InvalidConfigException, TableNotFoundException {
+ Configuration rebalanceUserConfig) throws InvalidConfigException, TableNotFoundException {
TableConfig tableConfig = getTableConfig(rawTableName, tableType);
if (tableConfig == null) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index 0e311e6..f21d4cb 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -64,9 +64,11 @@ public class PinotTableIdealStateBuilder {
public static IdealState buildEmptyIdealStateFor(String tableName, int numCopies, boolean enableBatchMessageMode) {
final CustomModeISBuilder customModeIdealStateBuilder = new CustomModeISBuilder(tableName);
final int replicas = numCopies;
- customModeIdealStateBuilder
- .setStateModel(PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL)
- .setNumPartitions(0).setNumReplica(replicas).setMaxPartitionsPerNode(1);
+ customModeIdealStateBuilder.setStateModel(
+ PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL)
+ .setNumPartitions(0)
+ .setNumReplica(replicas)
+ .setMaxPartitionsPerNode(1);
final IdealState idealState = customModeIdealStateBuilder.build();
idealState.setInstanceGroupTag(tableName);
idealState.setBatchMessageMode(enableBatchMessageMode);
@@ -88,7 +90,8 @@ public class PinotTableIdealStateBuilder {
new CustomModeISBuilder(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
customModeIdealStateBuilder.setStateModel(
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL)
- .setMaxPartitionsPerNode(Integer.MAX_VALUE).setNumReplica(Integer.MAX_VALUE)
+ .setMaxPartitionsPerNode(Integer.MAX_VALUE)
+ .setNumReplica(Integer.MAX_VALUE)
.setNumPartitions(Integer.MAX_VALUE);
final IdealState idealState = customModeIdealStateBuilder.build();
idealState.setBatchMessageMode(enableBatchMessageMode);
@@ -118,8 +121,9 @@ public class PinotTableIdealStateBuilder {
return idealState;
}
- public static void buildLowLevelRealtimeIdealStateFor(String realtimeTableName, TableConfig realtimeTableConfig,
- IdealState idealState, boolean enableBatchMessageMode) {
+ public static void buildLowLevelRealtimeIdealStateFor(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager,
+ String realtimeTableName, TableConfig realtimeTableConfig, IdealState idealState,
+ boolean enableBatchMessageMode) {
// Validate replicasPerPartition here.
final String replicasPerPartitionStr = realtimeTableConfig.getValidationConfig().getReplicasPerPartition();
@@ -136,9 +140,8 @@ public class PinotTableIdealStateBuilder {
if (idealState == null) {
idealState = buildEmptyRealtimeIdealStateFor(realtimeTableName, nReplicas, enableBatchMessageMode);
}
- final PinotLLCRealtimeSegmentManager segmentManager = PinotLLCRealtimeSegmentManager.getInstance();
try {
- segmentManager.setupNewTable(realtimeTableConfig, idealState);
+ pinotLLCRealtimeSegmentManager.setupNewTable(realtimeTableConfig, idealState);
} catch (InvalidConfigException e) {
throw new IllegalStateException("Caught exception when creating table " + realtimeTableName, e);
}
@@ -159,9 +162,11 @@ public class PinotTableIdealStateBuilder {
public static IdealState buildEmptyRealtimeIdealStateFor(String realtimeTableName, int replicaCount,
boolean enableBatchMessageMode) {
final CustomModeISBuilder customModeIdealStateBuilder = new CustomModeISBuilder(realtimeTableName);
- customModeIdealStateBuilder
- .setStateModel(PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL)
- .setNumPartitions(0).setNumReplica(replicaCount).setMaxPartitionsPerNode(1);
+ customModeIdealStateBuilder.setStateModel(
+ PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL)
+ .setNumPartitions(0)
+ .setNumReplica(replicaCount)
+ .setMaxPartitionsPerNode(1);
final IdealState idealState = customModeIdealStateBuilder.build();
idealState.setInstanceGroupTag(realtimeTableName);
idealState.setBatchMessageMode(enableBatchMessageMode);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 324b9ae..d21861c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -111,9 +111,6 @@ public class PinotLLCRealtimeSegmentManager {
*/
private static int MAX_SEGMENT_COMPLETION_TIME_MILLIS = 300_000; // 5 MINUTES
- // TODO: fix the misuse of singleton.
- private static PinotLLCRealtimeSegmentManager INSTANCE = null;
-
private final HelixAdmin _helixAdmin;
private final HelixManager _helixManager;
private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
@@ -139,25 +136,13 @@ public class PinotLLCRealtimeSegmentManager {
return _controllerConf.generateVipUrl();
}
- public static synchronized void create(PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
+ public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
- create(helixResourceManager.getHelixAdmin(), helixResourceManager.getHelixClusterName(),
+ this(helixResourceManager.getHelixAdmin(), helixResourceManager.getHelixClusterName(),
helixResourceManager.getHelixZkManager(), helixResourceManager.getPropertyStore(), helixResourceManager,
controllerConf, controllerMetrics, controllerLeadershipManager);
}
- private static synchronized void create(HelixAdmin helixAdmin, String clusterName, HelixManager helixManager,
- ZkHelixPropertyStore propertyStore, PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
- ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
- if (INSTANCE != null) {
- throw new RuntimeException("Instance already created");
- }
- INSTANCE =
- new PinotLLCRealtimeSegmentManager(helixAdmin, clusterName, helixManager, propertyStore, helixResourceManager,
- controllerConf, controllerMetrics, controllerLeadershipManager);
- SegmentCompletionManager.create(helixManager, INSTANCE, controllerConf, controllerMetrics, controllerLeadershipManager);
- }
-
public void stop() {
_isStopping = true;
LOGGER
@@ -181,8 +166,6 @@ public class PinotLLCRealtimeSegmentManager {
}
}
LOGGER.info("Wait completed: Number of completing segments = {}", _numCompletingSegments.get());
- INSTANCE = null;
- SegmentCompletionManager.stop();
}
protected PinotLLCRealtimeSegmentManager(HelixAdmin helixAdmin, String clusterName, HelixManager helixManager,
@@ -206,13 +189,6 @@ public class PinotLLCRealtimeSegmentManager {
_controllerLeadershipManager = controllerLeadershipManager;
}
- public static PinotLLCRealtimeSegmentManager getInstance() {
- if (INSTANCE == null) {
- throw new RuntimeException("Not yet created");
- }
- return INSTANCE;
- }
-
protected boolean isLeader() {
return _controllerLeadershipManager.isLeader();
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index 618c4a6..5ae4b25 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -64,9 +64,6 @@ public class SegmentCompletionManager {
ABORTED, // state machine is aborted. we will start a fresh one when the next segmentConsumed comes in.
}
- // TODO: fix the misuse of singleton.
- private static SegmentCompletionManager _instance = null;
-
private final HelixManager _helixManager;
// A map that holds the FSM for each segment.
private final Map<String, SegmentCompletionFSM> _fsmMap = new ConcurrentHashMap<>();
@@ -84,12 +81,15 @@ public class SegmentCompletionManager {
// TODO keep some history of past committed segments so that we can avoid looking up PROPERTYSTORE if some server comes in late.
- protected SegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager,
- ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
+ public SegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager,
+ ControllerMetrics controllerMetrics, int segmentCommitTimeoutSeconds,
+ ControllerLeadershipManager controllerLeadershipManager) {
_helixManager = helixManager;
_segmentManager = segmentManager;
_controllerMetrics = controllerMetrics;
_controllerLeadershipManager = controllerLeadershipManager;
+ SegmentCompletionProtocol.setMaxSegmentCommitTimeMs(
+ TimeUnit.MILLISECONDS.convert(segmentCommitTimeoutSeconds, TimeUnit.SECONDS));
}
public boolean isSplitCommitEnabled() {
@@ -100,25 +100,6 @@ public class SegmentCompletionManager {
return _segmentManager.getControllerVipUrl();
}
- public static SegmentCompletionManager create(HelixManager helixManager,
- PinotLLCRealtimeSegmentManager segmentManager, ControllerConf controllerConf,
- ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
- if (_instance != null) {
- throw new RuntimeException("Cannot create multiple instances");
- }
- _instance = new SegmentCompletionManager(helixManager, segmentManager, controllerMetrics, controllerLeadershipManager);
- SegmentCompletionProtocol.setMaxSegmentCommitTimeMs(
- TimeUnit.MILLISECONDS.convert(controllerConf.getSegmentCommitTimeoutSeconds(), TimeUnit.SECONDS));
- return _instance;
- }
-
- public static SegmentCompletionManager getInstance() {
- if (_instance == null) {
- throw new RuntimeException("Not yet created");
- }
- return _instance;
- }
-
protected long getCurrentTimeMs() {
return System.currentTimeMillis();
}
@@ -140,11 +121,11 @@ public class SegmentCompletionManager {
// Also good for synchronization, because it is possible that multiple threads take this path, and we don't want
// multiple instances of the FSM to be created for the same commit sequence at the same time.
final long endOffset = segmentMetadata.getEndOffset();
- fsm = SegmentCompletionFSM
- .fsmInCommit(_segmentManager, this, segmentName, segmentMetadata.getNumReplicas(), endOffset);
+ fsm = SegmentCompletionFSM.fsmInCommit(_segmentManager, this, segmentName, segmentMetadata.getNumReplicas(),
+ endOffset);
} else if (msgType.equals(SegmentCompletionProtocol.MSG_TYPE_STOPPED_CONSUMING)) {
- fsm = SegmentCompletionFSM
- .fsmStoppedConsuming(_segmentManager, this, segmentName, segmentMetadata.getNumReplicas());
+ fsm = SegmentCompletionFSM.fsmStoppedConsuming(_segmentManager, this, segmentName,
+ segmentMetadata.getNumReplicas());
} else {
// Segment is in the process of completing, and this is the first one to respond. Create fsm
fsm = SegmentCompletionFSM.fsmInHolding(_segmentManager, this, segmentName, segmentMetadata.getNumReplicas());
@@ -627,7 +608,8 @@ public class SegmentCompletionManager {
LOGGER.error("Segment upload failed");
return abortAndReturnFailed();
}
- SegmentCompletionProtocol.Response response = commitSegment(reqParams, isSplitCommit, committingSegmentDescriptor);
+ SegmentCompletionProtocol.Response response =
+ commitSegment(reqParams, isSplitCommit, committingSegmentDescriptor);
if (!response.equals(SegmentCompletionProtocol.RESP_COMMIT_SUCCESS)) {
return abortAndReturnFailed();
} else {
@@ -644,10 +626,11 @@ public class SegmentCompletionManager {
private SegmentCompletionProtocol.Response commit(String instanceId, long offset) {
long allowedBuildTimeSec = (_maxTimeAllowedToCommitMs - _startTimeMs) / 1000;
- LOGGER
- .info("{}:COMMIT for instance={} offset={} buldTimeSec={}", _state, instanceId, offset, allowedBuildTimeSec);
+ LOGGER.info("{}:COMMIT for instance={} offset={} buldTimeSec={}", _state, instanceId, offset,
+ allowedBuildTimeSec);
SegmentCompletionProtocol.Response.Params params =
- new SegmentCompletionProtocol.Response.Params().withOffset(offset).withBuildTimeSeconds(allowedBuildTimeSec)
+ new SegmentCompletionProtocol.Response.Params().withOffset(offset)
+ .withBuildTimeSeconds(allowedBuildTimeSec)
.withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT)
.withSplitCommit(_isSplitCommitEnabled);
if (_isSplitCommitEnabled) {
@@ -676,21 +659,21 @@ public class SegmentCompletionManager {
private SegmentCompletionProtocol.Response hold(String instanceId, long offset) {
LOGGER.info("{}:HOLD for instance={} offset={}", _state, instanceId, offset);
- return new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params()
- .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD).withOffset(offset));
+ return new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withStatus(
+ SegmentCompletionProtocol.ControllerResponseStatus.HOLD).withOffset(offset));
}
private SegmentCompletionProtocol.Response abortAndReturnHold(long now, String instanceId, long offset) {
_state = State.ABORTED;
- _segmentCompletionManager._controllerMetrics
- .addMeteredTableValue(_segmentName.getTableName(), ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
+ _segmentCompletionManager._controllerMetrics.addMeteredTableValue(_segmentName.getTableName(),
+ ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
return hold(instanceId, offset);
}
private SegmentCompletionProtocol.Response abortAndReturnFailed() {
_state = State.ABORTED;
- _segmentCompletionManager._controllerMetrics
- .addMeteredTableValue(_segmentName.getTableName(), ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
+ _segmentCompletionManager._controllerMetrics.addMeteredTableValue(_segmentName.getTableName(),
+ ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
return SegmentCompletionProtocol.RESP_FAILED;
}
@@ -962,9 +945,8 @@ public class SegmentCompletionManager {
private SegmentCompletionProtocol.Response processStoppedConsuming(String instanceId, long offset, String reason,
boolean createNew) {
- LOGGER
- .info("Instance {} stopped consuming segment {} at offset {}, state {}, createNew: {}, reason:{}", instanceId,
- _segmentName, offset, _state, createNew, reason);
+ LOGGER.info("Instance {} stopped consuming segment {} at offset {}, state {}, createNew: {}, reason:{}",
+ instanceId, _segmentName, offset, _state, createNew, reason);
_segmentManager.segmentStoppedConsuming(_segmentName, instanceId);
return SegmentCompletionProtocol.RESP_PROCESSED;
}
@@ -1008,8 +990,7 @@ public class SegmentCompletionManager {
}
private SegmentCompletionProtocol.Response commitSegment(SegmentCompletionProtocol.Request.Params reqParams,
- boolean isSplitCommit,
- CommittingSegmentDescriptor committingSegmentDescriptor) {
+ boolean isSplitCommit, CommittingSegmentDescriptor committingSegmentDescriptor) {
boolean success;
String instanceId = reqParams.getInstanceId();
long offset = reqParams.getOffset();
@@ -1118,10 +1099,6 @@ public class SegmentCompletionManager {
}
}
- public static void stop() {
- _instance = null;
- }
-
@VisibleForTesting
protected boolean isLeader() {
return _controllerLeadershipManager.isLeader();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
index 9bf70bd..b716e90 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
@@ -103,6 +103,15 @@ public class PinotControllerModeTest extends ControllerTest {
"Failed to start " + config.getControllerMode() + " controller in " + TIMEOUT_IN_MS + "ms.");
Assert.assertEquals(_controllerStarter.getControllerMode(), ControllerConf.ControllerMode.PINOT_ONLY);
+ // Start the second Pinot only controller
+ config.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++));
+ ControllerStarter controllerStarter = new ControllerStarter(config);
+ controllerStarter.start();
+
+ Thread.sleep(100_000L);
+
+ controllerStarter.stop();
+
stopController();
_controllerStarter = null;
helixControllerStarter.stop();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index 0dfe6b4..d004326 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -58,13 +58,11 @@ public class SegmentCompletionTest {
private final long s3Offset = 30L;
@BeforeMethod
- public void testCaseSetup()
- throws Exception {
+ public void testCaseSetup() throws Exception {
testCaseSetup(true, true);
}
- public void testCaseSetup(boolean isLeader, boolean isConnected)
- throws Exception {
+ public void testCaseSetup(boolean isLeader, boolean isConnected) throws Exception {
segmentManager = new MockPinotLLCRealtimeSegmentManager(isLeader, isConnected);
ControllerLeadershipManager controllerLeadershipManager = segmentManager.getControllerLeadershipManager();
final int partitionId = 23;
@@ -78,8 +76,7 @@ public class SegmentCompletionTest {
metadata.setNumReplicas(3);
segmentManager._segmentMetadata = metadata;
- segmentCompletionMgr =
- new MockSegmentCompletionManager(segmentManager, isLeader, isConnected);
+ segmentCompletionMgr = new MockSegmentCompletionManager(segmentManager, isLeader, isConnected);
segmentManager._segmentCompletionMgr = segmentCompletionMgr;
Field fsmMapField = SegmentCompletionManager.class.getDeclaredField("_fsmMap");
@@ -93,8 +90,7 @@ public class SegmentCompletionTest {
// Simulate a new controller taking over with an empty completion manager object,
// but segment metadata is fine in zk
- private void replaceSegmentCompletionManager()
- throws Exception {
+ private void replaceSegmentCompletionManager() throws Exception {
long oldSecs = segmentCompletionMgr._secconds;
segmentCompletionMgr = new MockSegmentCompletionManager(segmentManager, true, true);
segmentCompletionMgr._secconds = oldSecs;
@@ -104,8 +100,7 @@ public class SegmentCompletionTest {
}
@Test
- public void testStoppedConsumeDuringCompletion()
- throws Exception {
+ public void testStoppedConsumeDuringCompletion() throws Exception {
SegmentCompletionProtocol.Response response;
Request.Params params;
final String reason = "IAmLazy";
@@ -154,8 +149,8 @@ public class SegmentCompletionTest {
segmentCompletionMgr._secconds += 5;
params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
- response = segmentCompletionMgr
- .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+ response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
+ CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
// Now the FSM should have disappeared from the map
@@ -172,8 +167,7 @@ public class SegmentCompletionTest {
}
@Test
- public void testStoppedConsumeBeforeHold()
- throws Exception {
+ public void testStoppedConsumeBeforeHold() throws Exception {
SegmentCompletionProtocol.Response response;
Request.Params params;
final String reason = "IAmLazy";
@@ -224,8 +218,8 @@ public class SegmentCompletionTest {
segmentCompletionMgr._secconds += 5;
params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
- response = segmentCompletionMgr
- .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+ response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
+ CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
// Now the FSM should have disappeared from the map
@@ -243,13 +237,14 @@ public class SegmentCompletionTest {
// s2 sends stoppedConsuming message, but then may have gotten restarted, so eventually we complete the segment.
@Test
- public void testHappyPathAfterStoppedConsuming()
- throws Exception {
+ public void testHappyPathAfterStoppedConsuming() throws Exception {
SegmentCompletionProtocol.Response response;
Request.Params params;
segmentCompletionMgr._secconds = 5;
- params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+ params = new Request.Params().withInstanceId(s2)
+ .withOffset(s2Offset)
+ .withSegmentName(segmentNameStr)
.withReason("some reason");
response = segmentCompletionMgr.segmentStoppedConsuming(params);
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED);
@@ -262,21 +257,18 @@ public class SegmentCompletionTest {
}
@Test
- public void testHappyPath()
- throws Exception {
+ public void testHappyPath() throws Exception {
testHappyPath(5L);
}
// Tests happy path with split commit protocol
@Test
- public void testHappyPathSplitCommit()
- throws Exception {
+ public void testHappyPathSplitCommit() throws Exception {
testHappyPathSplitCommit(5L);
}
@Test
- public void testExceptionInConsumedMessage()
- throws Exception {
+ public void testExceptionInConsumedMessage() throws Exception {
segmentManager._segmentMetadata = null;
SegmentCompletionProtocol.Response response;
@@ -289,8 +281,7 @@ public class SegmentCompletionTest {
// When commit segment file fails, makes sure the fsm aborts and that a segment can successfully commit afterwards.
@Test
- public void testCommitSegmentFileFail()
- throws Exception {
+ public void testCommitSegmentFileFail() throws Exception {
SegmentCompletionProtocol.Response response;
Request.Params params;
// s1 sends offset of 20, gets HOLD at t = 5s;
@@ -333,10 +324,12 @@ public class SegmentCompletionTest {
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
// s2's file does not successfully commit
segmentCompletionMgr._secconds += 5;
- params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+ params = new Request.Params().withInstanceId(s2)
+ .withOffset(s2Offset)
+ .withSegmentName(segmentNameStr)
.withSegmentLocation("doNotCommitMe");
- response = segmentCompletionMgr
- .segmentCommitEnd(params, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+ response = segmentCompletionMgr.segmentCommitEnd(params, true, true,
+ CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.FAILED);
// Now the FSM should have aborted
@@ -367,17 +360,18 @@ public class SegmentCompletionTest {
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
// s2's file successfully commits
segmentCompletionMgr._secconds += 5;
- params = new Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr)
+ params = new Request.Params().withInstanceId(s3)
+ .withOffset(s2Offset)
+ .withSegmentName(segmentNameStr)
.withSegmentLocation("location");
- response = segmentCompletionMgr
- .segmentCommitEnd(params, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+ response = segmentCompletionMgr.segmentCommitEnd(params, true, true,
+ CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(), ControllerResponseStatus.COMMIT_SUCCESS);
// And the FSM should be removed.
Assert.assertFalse(fsmMap.containsKey(segmentNameStr));
}
- public void testHappyPathSplitCommit(long startTime)
- throws Exception {
+ public void testHappyPathSplitCommit(long startTime) throws Exception {
SegmentCompletionProtocol.Response response;
Request.Params params;
// s1 sends offset of 20, gets HOLD at t = 5s;
@@ -420,10 +414,12 @@ public class SegmentCompletionTest {
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
segmentCompletionMgr._secconds += 5;
- params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+ params = new Request.Params().withInstanceId(s2)
+ .withOffset(s2Offset)
+ .withSegmentName(segmentNameStr)
.withSegmentLocation("location");
- response = segmentCompletionMgr
- .segmentCommitEnd(params, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+ response = segmentCompletionMgr.segmentCommitEnd(params, true, true,
+ CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
// Now the FSM should have disappeared from the map
@@ -441,8 +437,7 @@ public class SegmentCompletionTest {
// Tests that we abort when the server instance comes back with a different offset than it is told to commit with
@Test
- public void testCommitDifferentOffsetSplitCommit()
- throws Exception {
+ public void testCommitDifferentOffsetSplitCommit() throws Exception {
SegmentCompletionProtocol.Response response;
Request.Params params;
// s1 sends offset of 20, gets HOLD at t = 5s;
@@ -470,10 +465,12 @@ public class SegmentCompletionTest {
// s3 comes back to try to commit with a different offset
segmentCompletionMgr._secconds += 5;
- params = new Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr)
+ params = new Request.Params().withInstanceId(s3)
+ .withOffset(s3Offset)
+ .withSegmentName(segmentNameStr)
.withSegmentLocation("location");
- response = segmentCompletionMgr
- .segmentCommitEnd(params, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+ response = segmentCompletionMgr.segmentCommitEnd(params, true, true,
+ CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.FAILED);
// Now the FSM should have disappeared from the map
@@ -489,8 +486,7 @@ public class SegmentCompletionTest {
Assert.assertTrue(fsmMap.containsKey(segmentNameStr));
}
- public void testHappyPath(long startTime)
- throws Exception {
+ public void testHappyPath(long startTime) throws Exception {
SegmentCompletionProtocol.Response response;
Request.Params params;
// s1 sends offset of 20, gets HOLD at t = 5s;
@@ -532,8 +528,8 @@ public class SegmentCompletionTest {
segmentCompletionMgr._secconds += 5;
params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
- response = segmentCompletionMgr
- .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+ response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
+ CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
// Now the FSM should have disappeared from the map
@@ -550,50 +546,57 @@ public class SegmentCompletionTest {
}
@Test
- public void testControllerNotConnected()
- throws Exception {
+ public void testControllerNotConnected() throws Exception {
testCaseSetup(true, false); // Leader but not connected
SegmentCompletionProtocol.Response response;
Request.Params params;
// s1 sends offset of 20, gets HOLD at t = 5s;
segmentCompletionMgr._secconds = 5L;
- params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr)
+ params = new Request.Params().withInstanceId(s1)
+ .withOffset(s1Offset)
+ .withSegmentName(segmentNameStr)
.withReason("rowLimit");
response = segmentCompletionMgr.segmentConsumed(params);
Assert.assertEquals(response.getStatus(), ControllerResponseStatus.NOT_LEADER);
}
@Test
- public void testWinnerOnTimeLimit()
- throws Exception {
+ public void testWinnerOnTimeLimit() throws Exception {
SegmentCompletionProtocol.Response response;
Request.Params params;
segmentCompletionMgr._secconds = 10L;
- params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr)
+ params = new Request.Params().withInstanceId(s1)
+ .withOffset(s1Offset)
+ .withSegmentName(segmentNameStr)
.withReason(SegmentCompletionProtocol.REASON_TIME_LIMIT);
response = segmentCompletionMgr.segmentConsumed(params);
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
}
@Test
- public void testWinnerOnRowLimit()
- throws Exception {
+ public void testWinnerOnRowLimit() throws Exception {
SegmentCompletionProtocol.Response response;
Request.Params params;
segmentCompletionMgr._secconds = 10L;
- params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr)
+ params = new Request.Params().withInstanceId(s1)
+ .withOffset(s1Offset)
+ .withSegmentName(segmentNameStr)
.withReason(SegmentCompletionProtocol.REASON_ROW_LIMIT);
response = segmentCompletionMgr.segmentConsumed(params);
Assert.assertEquals(response.getStatus(), ControllerResponseStatus.COMMIT);
// S2 comes with the same offset as S1
segmentCompletionMgr._secconds += 1;
- params = new Request.Params().withInstanceId(s2).withOffset(s1Offset).withSegmentName(segmentNameStr)
+ params = new Request.Params().withInstanceId(s2)
+ .withOffset(s1Offset)
+ .withSegmentName(segmentNameStr)
.withReason(SegmentCompletionProtocol.REASON_ROW_LIMIT);
response = segmentCompletionMgr.segmentConsumed(params);
Assert.assertEquals(response.getStatus(), ControllerResponseStatus.HOLD);
segmentCompletionMgr._secconds += 1;
// S3 comes with a different offset and without row limit. we ask it to hold even though it is higher.
- params = new Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr)
+ params = new Request.Params().withInstanceId(s3)
+ .withOffset(s3Offset)
+ .withSegmentName(segmentNameStr)
.withReason(SegmentCompletionProtocol.REASON_TIME_LIMIT);
response = segmentCompletionMgr.segmentConsumed(params);
Assert.assertEquals(response.getStatus(), ControllerResponseStatus.HOLD);
@@ -605,16 +608,20 @@ public class SegmentCompletionTest {
segmentCompletionMgr._secconds += 5;
params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
- response = segmentCompletionMgr
- .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+ response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
+ CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
// We ask S2 to keep the segment
- params = new Request.Params().withInstanceId(s2).withOffset(s1Offset).withSegmentName(segmentNameStr)
+ params = new Request.Params().withInstanceId(s2)
+ .withOffset(s1Offset)
+ .withSegmentName(segmentNameStr)
.withReason(SegmentCompletionProtocol.REASON_ROW_LIMIT);
response = segmentCompletionMgr.segmentConsumed(params);
Assert.assertEquals(response.getStatus(), ControllerResponseStatus.KEEP);
// And we ask S3 to discard because it was ahead.
- params = new Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr)
+ params = new Request.Params().withInstanceId(s3)
+ .withOffset(s3Offset)
+ .withSegmentName(segmentNameStr)
.withReason(SegmentCompletionProtocol.REASON_TIME_LIMIT);
response = segmentCompletionMgr.segmentConsumed(params);
Assert.assertEquals(response.getStatus(), ControllerResponseStatus.DISCARD);
@@ -622,19 +629,16 @@ public class SegmentCompletionTest {
// Tests that when server is delayed(Stalls for a hour), when server comes back, we commit successfully.
@Test
- public void testDelayedServerSplitCommit()
- throws Exception {
+ public void testDelayedServerSplitCommit() throws Exception {
testDelayedServer(true);
}
@Test
- public void testDelayedServer()
- throws Exception {
+ public void testDelayedServer() throws Exception {
testDelayedServer(false);
}
- public void testDelayedServer(boolean isSplitCommit)
- throws Exception {
+ public void testDelayedServer(boolean isSplitCommit) throws Exception {
SegmentCompletionProtocol.Response response;
Request.Params params;
// s1 sends offset of 20, gets HOLD at t = 5s;
@@ -670,7 +674,9 @@ public class SegmentCompletionTest {
response = segmentCompletionMgr.segmentCommitStart(params);
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
segmentCompletionMgr._secconds += 5;
- params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+ params = new Request.Params().withInstanceId(s2)
+ .withOffset(s2Offset)
+ .withSegmentName(segmentNameStr)
.withSegmentLocation("location");
response = segmentCompletionMgr.segmentCommitEnd(params, true, isSplitCommit,
CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
@@ -689,8 +695,7 @@ public class SegmentCompletionTest {
// We test the case where all servers go silent after controller asks one of them commit
@Test
- public void testDeadServers()
- throws Exception {
+ public void testDeadServers() throws Exception {
SegmentCompletionProtocol.Response response;
Request.Params params;
// s1 sends offset of 20, gets HOLD at t = 5s;
@@ -737,8 +742,7 @@ public class SegmentCompletionTest {
// We test the case when the committer is asked to commit, but they never come back.
@Test
- public void testCommitterFailure()
- throws Exception {
+ public void testCommitterFailure() throws Exception {
SegmentCompletionProtocol.Response response;
Request.Params params;
// s1 sends offset of 20, gets HOLD at t = 5s;
@@ -807,8 +811,7 @@ public class SegmentCompletionTest {
}
@Test
- public void testHappyPathSlowCommit()
- throws Exception {
+ public void testHappyPathSlowCommit() throws Exception {
SegmentCompletionProtocol.Response response;
Request.Params params;
// s1 sends offset of 20, gets HOLD at t = 1509242466s;
@@ -845,7 +848,9 @@ public class SegmentCompletionTest {
// Fast forward to one second before commit time, and send a lease renewal request for 20s
segmentCompletionMgr._secconds = startTime + commitTimeSec - 1;
- params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+ params = new Request.Params().withInstanceId(s2)
+ .withOffset(s2Offset)
+ .withSegmentName(segmentNameStr)
.withExtraTimeSec(20);
response = segmentCompletionMgr.extendBuildTime(params);
Assert.assertEquals(response.getStatus(), ControllerResponseStatus.PROCESSED);
@@ -853,7 +858,9 @@ public class SegmentCompletionTest {
// Another lease extension in 19s.
segmentCompletionMgr._secconds += 19;
- params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+ params = new Request.Params().withInstanceId(s2)
+ .withOffset(s2Offset)
+ .withSegmentName(segmentNameStr)
.withExtraTimeSec(20);
response = segmentCompletionMgr.extendBuildTime(params);
Assert.assertEquals(response.getStatus(), ControllerResponseStatus.PROCESSED);
@@ -867,16 +874,15 @@ public class SegmentCompletionTest {
long commitTimeMs = (segmentCompletionMgr._secconds - startTime) * 1000;
Assert.assertEquals(commitTimeMap.get(tableName).longValue(), commitTimeMs);
segmentCompletionMgr._secconds += 55;
- response = segmentCompletionMgr
- .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+ response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
+ CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
// now FSM should be out of the map.
Assert.assertFalse((fsmMap.containsKey(segmentNameStr)));
}
@Test
- public void testFailedSlowCommit()
- throws Exception {
+ public void testFailedSlowCommit() throws Exception {
SegmentCompletionProtocol.Response response;
Request.Params params;
final String tableName = new LLCSegmentName(segmentNameStr).getTableName();
@@ -912,7 +918,9 @@ public class SegmentCompletionTest {
// Fast forward to one second before commit time, and send a lease renewal request for 20s
segmentCompletionMgr._secconds = startTime + commitTimeSec - 1;
- params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+ params = new Request.Params().withInstanceId(s2)
+ .withOffset(s2Offset)
+ .withSegmentName(segmentNameStr)
.withExtraTimeSec(20);
response = segmentCompletionMgr.extendBuildTime(params);
Assert.assertEquals(response.getStatus(), ControllerResponseStatus.PROCESSED);
@@ -929,8 +937,7 @@ public class SegmentCompletionTest {
}
@Test
- public void testLeaseTooLong()
- throws Exception {
+ public void testLeaseTooLong() throws Exception {
SegmentCompletionProtocol.Response response;
Request.Params params;
// s1 sends offset of 20, gets HOLD at t = 5s;
@@ -965,7 +972,9 @@ public class SegmentCompletionTest {
// Fast forward to one second before commit time, and send a lease renewal request for 20s
segmentCompletionMgr._secconds = startTime + commitTimeSec - 1;
- params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+ params = new Request.Params().withInstanceId(s2)
+ .withOffset(s2Offset)
+ .withSegmentName(segmentNameStr)
.withExtraTimeSec(20);
response = segmentCompletionMgr.extendBuildTime(params);
Assert.assertEquals(response.getStatus(), ControllerResponseStatus.PROCESSED);
@@ -973,9 +982,11 @@ public class SegmentCompletionTest {
final int leaseTimeSec = 20;
// Lease will not be granted if the time taken so far plus lease time exceeds the max allowabale.
- while (segmentCompletionMgr._secconds + leaseTimeSec <= startTime + SegmentCompletionManager
- .getMaxCommitTimeForAllSegmentsSeconds()) {
- params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+ while (segmentCompletionMgr._secconds + leaseTimeSec
+ <= startTime + SegmentCompletionManager.getMaxCommitTimeForAllSegmentsSeconds()) {
+ params = new Request.Params().withInstanceId(s2)
+ .withOffset(s2Offset)
+ .withSegmentName(segmentNameStr)
.withExtraTimeSec(leaseTimeSec);
response = segmentCompletionMgr.extendBuildTime(params);
Assert.assertEquals(response.getStatus(), ControllerResponseStatus.PROCESSED);
@@ -984,7 +995,9 @@ public class SegmentCompletionTest {
}
// Now the lease request should fail.
- params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+ params = new Request.Params().withInstanceId(s2)
+ .withOffset(s2Offset)
+ .withSegmentName(segmentNameStr)
.withExtraTimeSec(leaseTimeSec);
response = segmentCompletionMgr.extendBuildTime(params);
Assert.assertEquals(response.getStatus(), ControllerResponseStatus.FAILED);
@@ -992,8 +1005,7 @@ public class SegmentCompletionTest {
}
@Test
- public void testControllerFailureDuringCommit()
- throws Exception {
+ public void testControllerFailureDuringCommit() throws Exception {
SegmentCompletionProtocol.Response response;
Request.Params params;
// s1 sends offset of 20, gets HOLD at t = 5s;
@@ -1052,8 +1064,7 @@ public class SegmentCompletionTest {
// Tests that when controller fails, after controller failure, commit doesn't continue. Then because all server instances
// are in holding state, we will ask one to commit.
@Test
- public void testControllerFailureDuringSplitCommit()
- throws Exception {
+ public void testControllerFailureDuringSplitCommit() throws Exception {
SegmentCompletionProtocol.Response response;
Request.Params params;
// s1 sends offset of 20, gets HOLD at t = 5s;
@@ -1104,15 +1115,16 @@ public class SegmentCompletionTest {
Assert.assertTrue(response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.HOLD));
// So s2 goes back into HOLDING state. s1 and s3 are already holding, so now it will get COMMIT back.
- params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+ params = new Request.Params().withInstanceId(s2)
+ .withOffset(s2Offset)
+ .withSegmentName(segmentNameStr)
.withSegmentLocation("location");
response = segmentCompletionMgr.segmentConsumed(params);
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
}
@Test
- public void testNotLeader()
- throws Exception {
+ public void testNotLeader() throws Exception {
testCaseSetup(false, true);
SegmentCompletionProtocol.Response response;
SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
@@ -1157,8 +1169,8 @@ public class SegmentCompletionTest {
public boolean commitSegmentMetadata(String rawTableName, CommittingSegmentDescriptor committingSegmentDescriptor) {
_segmentMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
_segmentMetadata.setEndOffset(committingSegmentDescriptor.getNextOffset());
- _segmentMetadata.setDownloadUrl(ControllerConf
- .constructDownloadUrl(rawTableName, committingSegmentDescriptor.getSegmentName(),
+ _segmentMetadata.setDownloadUrl(
+ ControllerConf.constructDownloadUrl(rawTableName, committingSegmentDescriptor.getSegmentName(),
CONTROLLER_CONF.generateVipUrl()));
_segmentMetadata.setEndTime(_segmentCompletionMgr.getCurrentTimeMs());
return true;
@@ -1194,15 +1206,15 @@ public class SegmentCompletionTest {
this(createMockHelixManager(isLeader, isConnected), segmentManager, isLeader, isConnected);
}
- protected MockSegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager, boolean isLeader,
- boolean isConnected) {
+ protected MockSegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager,
+ boolean isLeader, boolean isConnected) {
this(helixManager, segmentManager, isLeader, isConnected, new ControllerLeadershipManager(helixManager));
}
protected MockSegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager,
boolean isLeader, boolean isConnected, ControllerLeadershipManager controllerLeadershipManager) {
super(helixManager, segmentManager, new ControllerMetrics(new MetricsRegistry()),
- controllerLeadershipManager);
+ SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds(), controllerLeadershipManager);
_isLeader = isLeader;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org