You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/06/14 23:47:44 UTC
[incubator-pinot] 01/01: Add logic for lead controller resource on
controller side
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch add-logic-for-lead-controller-resource
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 78c134081aee26930b3de37cc86904380ad9c4e4
Author: jackjlli <jl...@linkedin.com>
AuthorDate: Fri Jun 14 16:35:14 2019 -0700
Add logic for lead controller resource on controller side
---
.../apache/pinot/controller/ControllerStarter.java | 44 ++++++++-----
.../pinot/controller/LeadControllerManager.java | 76 ++++++++++++++++++++++
.../PinotSegmentUploadRestletResource.java | 7 +-
.../controller/api/upload/SegmentValidator.java | 10 +--
.../controller/helix/SegmentStatusChecker.java | 8 ++-
.../helix/core/PinotHelixResourceManager.java | 19 +++++-
.../helix/core/minion/PinotTaskManager.java | 8 ++-
.../core/periodictask/ControllerPeriodicTask.java | 9 ++-
.../ControllerPeriodicTaskScheduler.java | 8 +--
.../realtime/PinotLLCRealtimeSegmentManager.java | 32 ++++-----
.../core/realtime/SegmentCompletionManager.java | 54 ++++++++-------
.../core/relocation/RealtimeSegmentRelocator.java | 8 ++-
.../helix/core/retention/RetentionManager.java | 8 ++-
.../core/statemodel/LeadControllerChecker.java | 55 ++++++++++++++++
...rollerResourceMasterSlaveStateModelFactory.java | 64 ++++++++++++++++++
.../BrokerResourceValidationManager.java | 5 +-
.../validation/OfflineSegmentIntervalChecker.java | 13 ++--
.../RealtimeSegmentValidationManager.java | 8 ++-
.../controller/validation/StorageQuotaChecker.java | 14 ++--
.../controller/helix/SegmentStatusCheckerTest.java | 73 ++++++++++++++++++---
.../periodictask/ControllerPeriodicTaskTest.java | 7 +-
.../PinotLLCRealtimeSegmentManagerTest.java | 7 +-
.../helix/core/realtime/SegmentCompletionTest.java | 10 ++-
.../relocation/RealtimeSegmentRelocatorTest.java | 14 ++--
.../helix/core/retention/RetentionManagerTest.java | 12 +++-
.../validation/StorageQuotaCheckerTest.java | 27 ++++----
26 files changed, 455 insertions(+), 145 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index c313616..97b6095 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -113,6 +113,7 @@ public class ControllerStarter {
private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
private SegmentCompletionManager _segmentCompletionManager;
private ControllerLeadershipManager _controllerLeadershipManager;
+ private LeadControllerManager _leadControllerManager;
private List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbackList;
public ControllerStarter(ControllerConf conf) {
@@ -255,6 +256,10 @@ public class ControllerStarter {
_helixResourceManager.start();
HelixManager helixParticipantManager = _helixResourceManager.getHelixZkManager();
+ // Get lead controller manager from resource manager, register controller leadership manager to get helix leader.
+ _leadControllerManager = _helixResourceManager.getLeadControllerManager();
+ _leadControllerManager.registerControllerLeadershipManager(_controllerLeadershipManager);
+
LOGGER.info("Registering controller leadership manager");
// TODO: when Helix separation is completed, leadership only depends on the master in leadControllerResource, remove
// ControllerLeadershipManager and this callback.
@@ -266,15 +271,13 @@ public class ControllerStarter {
// Helix resource manager must be started in order to create PinotLLCRealtimeSegmentManager
LOGGER.info("Starting realtime segment manager");
-
_pinotLLCRealtimeSegmentManager =
- new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config, _controllerMetrics,
- _controllerLeadershipManager);
+ new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config, _controllerMetrics, _leadControllerManager);
// TODO: Need to put this inside HelixResourceManager when ControllerLeadershipManager is removed.
_helixResourceManager.registerPinotLLCRealtimeSegmentManager(_pinotLLCRealtimeSegmentManager);
_segmentCompletionManager =
new SegmentCompletionManager(helixParticipantManager, _pinotLLCRealtimeSegmentManager, _controllerMetrics,
- _controllerLeadershipManager, _config.getSegmentCommitTimeoutSeconds());
+ _leadControllerManager, _config.getSegmentCommitTimeoutSeconds());
if (_config.getHLCTablesAllowed()) {
LOGGER.info("Realtime tables with High Level consumers will be supported");
@@ -289,7 +292,9 @@ public class ControllerStarter {
List<PeriodicTask> controllerPeriodicTasks = setupControllerPeriodicTasks();
LOGGER.info("Init controller periodic tasks scheduler");
_controllerPeriodicTaskScheduler = new ControllerPeriodicTaskScheduler();
- _controllerPeriodicTaskScheduler.init(controllerPeriodicTasks, _controllerLeadershipManager);
+ _controllerPeriodicTaskScheduler.init(controllerPeriodicTasks, _leadControllerManager);
+
+ _controllerPeriodicTaskScheduler.start();
LOGGER.info("Registering rebalance segments factory");
_helixResourceManager
@@ -327,7 +332,7 @@ public class ControllerStarter {
bind(_controllerMetrics).to(ControllerMetrics.class);
bind(accessControlFactory).to(AccessControlFactory.class);
bind(metadataEventNotifierFactory).to(MetadataEventNotifierFactory.class);
- bind(_controllerLeadershipManager).to(ControllerLeadershipManager.class);
+ bind(_leadControllerManager).to(LeadControllerManager.class);
}
});
@@ -433,24 +438,29 @@ public class ControllerStarter {
protected List<PeriodicTask> setupControllerPeriodicTasks() {
LOGGER.info("Setting up periodic tasks");
List<PeriodicTask> periodicTasks = new ArrayList<>();
- _taskManager = new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, _config, _controllerMetrics);
+ _taskManager =
+ new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, _leadControllerManager, _config,
+ _controllerMetrics);
periodicTasks.add(_taskManager);
- _retentionManager = new RetentionManager(_helixResourceManager, _config, _controllerMetrics);
+ _retentionManager =
+ new RetentionManager(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
periodicTasks.add(_retentionManager);
_offlineSegmentIntervalChecker =
- new OfflineSegmentIntervalChecker(_config, _helixResourceManager, new ValidationMetrics(_metricsRegistry),
- _controllerMetrics);
+ new OfflineSegmentIntervalChecker(_config, _helixResourceManager, _leadControllerManager,
+ new ValidationMetrics(_metricsRegistry), _controllerMetrics);
periodicTasks.add(_offlineSegmentIntervalChecker);
_realtimeSegmentValidationManager =
- new RealtimeSegmentValidationManager(_config, _helixResourceManager, _pinotLLCRealtimeSegmentManager,
- new ValidationMetrics(_metricsRegistry), _controllerMetrics);
+ new RealtimeSegmentValidationManager(_config, _helixResourceManager, _leadControllerManager,
+ _pinotLLCRealtimeSegmentManager, new ValidationMetrics(_metricsRegistry), _controllerMetrics);
periodicTasks.add(_realtimeSegmentValidationManager);
_brokerResourceValidationManager =
- new BrokerResourceValidationManager(_config, _helixResourceManager, _controllerMetrics);
+ new BrokerResourceValidationManager(_config, _helixResourceManager, _leadControllerManager, _controllerMetrics);
periodicTasks.add(_brokerResourceValidationManager);
- _segmentStatusChecker = new SegmentStatusChecker(_helixResourceManager, _config, _controllerMetrics);
+ _segmentStatusChecker =
+ new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
periodicTasks.add(_segmentStatusChecker);
- _realtimeSegmentRelocator = new RealtimeSegmentRelocator(_helixResourceManager, _config, _controllerMetrics);
+ _realtimeSegmentRelocator =
+ new RealtimeSegmentRelocator(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
periodicTasks.add(_realtimeSegmentRelocator);
return periodicTasks;
@@ -482,6 +492,10 @@ public class ControllerStarter {
LOGGER.info("Stopping controller leadership manager");
_controllerLeadershipManager.stop();
+ // Stop controller periodic task.
+ LOGGER.info("Stopping controller periodic tasks");
+ _controllerPeriodicTaskScheduler.stop();
+
// Stop PinotLLCSegmentManager before stopping Jersey API. It is possible that stopping Jersey API
// may interrupt the handlers waiting on an I/O.
_pinotLLCRealtimeSegmentManager.stop();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java
new file mode 100644
index 0000000..9e4d3fb
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.config.TableNameBuilder;
+import org.apache.pinot.controller.helix.core.statemodel.LeadControllerChecker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pinot.common.utils.CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE;
+
+
+public class LeadControllerManager {
+ private static final Logger LOGGER = LoggerFactory.getLogger(LeadControllerManager.class);
+
+ private Map<String, LeadershipChangeSubscriber> _subscribers = new HashMap<>();
+
+ private final LeadControllerChecker _leadControllerChecker;
+ private ControllerLeadershipManager _controllerLeadershipManager;
+
+ public LeadControllerManager() {
+ _leadControllerChecker = new LeadControllerChecker();
+ }
+
+ public void registerControllerLeadershipManager(ControllerLeadershipManager controllerLeadershipManager) {
+ _controllerLeadershipManager = controllerLeadershipManager;
+ }
+
+ public boolean isLeaderForTable(String tableName) {
+ String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+ int partitionIndex = rawTableName.hashCode() % NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE;
+ if (_leadControllerChecker.isPartitionLeader(partitionIndex)) {
+ return true;
+ } else if (_controllerLeadershipManager != null) {
+ return _controllerLeadershipManager.isLeader();
+ } else {
+ return false;
+ }
+ }
+
+ public synchronized void addPartitionLeader(String partitionName) {
+ _leadControllerChecker.addPartitionLeader(partitionName);
+ }
+
+ public synchronized void removePartitionLeader(String partitionName) {
+ _leadControllerChecker.removePartitionLeader(partitionName);
+ }
+
+ public void subscribe(String className, LeadershipChangeSubscriber subscriber) {
+ LOGGER.info("{} subscribing to leadership changes", className);
+ _subscribers.put(className, subscriber);
+// subscriber.onBecomingLeader();
+ }
+
+ public void onLeadControllerChange() {
+
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
index 5259d9f..bd4b8a0 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
@@ -57,7 +57,6 @@ import javax.ws.rs.core.Response;
import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
-import org.apache.helix.ZNRecord;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.metrics.ControllerMeter;
@@ -70,7 +69,7 @@ import org.apache.pinot.common.utils.JsonUtils;
import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.api.access.AccessControl;
import org.apache.pinot.controller.api.access.AccessControlFactory;
import org.apache.pinot.controller.api.upload.SegmentValidator;
@@ -117,7 +116,7 @@ public class PinotSegmentUploadRestletResource {
AccessControlFactory _accessControlFactory;
@Inject
- ControllerLeadershipManager _controllerLeadershipManager;
+ LeadControllerManager _leadControllerManager;
@GET
@Produces(MediaType.APPLICATION_JSON)
@@ -325,7 +324,7 @@ public class PinotSegmentUploadRestletResource {
// Validate segment
SegmentValidatorResponse segmentValidatorResponse =
new SegmentValidator(_pinotHelixResourceManager, _controllerConf, _executor, _connectionManager,
- _controllerMetrics, _controllerLeadershipManager)
+ _controllerMetrics, _leadControllerManager)
.validateSegment(rawTableName, segmentMetadata, tempSegmentDir);
// Zk operations
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
index 39a9657..42c7cf8 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
@@ -34,7 +34,7 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.segment.SegmentMetadata;
import org.apache.pinot.common.utils.time.TimeUtils;
import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.api.resources.ControllerApplicationException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.util.TableSizeReader;
@@ -55,17 +55,17 @@ public class SegmentValidator {
private final Executor _executor;
private final HttpConnectionManager _connectionManager;
private final ControllerMetrics _controllerMetrics;
- private final ControllerLeadershipManager _controllerLeadershipManager;
+ private final LeadControllerManager _leadControllerManager;
public SegmentValidator(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf controllerConf,
Executor executor, HttpConnectionManager connectionManager, ControllerMetrics controllerMetrics,
- ControllerLeadershipManager controllerLeadershipManager) {
+ LeadControllerManager leadControllerManager) {
_pinotHelixResourceManager = pinotHelixResourceManager;
_controllerConf = controllerConf;
_executor = executor;
_connectionManager = connectionManager;
_controllerMetrics = controllerMetrics;
- _controllerLeadershipManager = controllerLeadershipManager;
+ _leadControllerManager = leadControllerManager;
}
public SegmentValidatorResponse validateSegment(String rawTableName, SegmentMetadata segmentMetadata,
@@ -135,7 +135,7 @@ public class SegmentValidator {
new TableSizeReader(_executor, _connectionManager, _controllerMetrics, _pinotHelixResourceManager);
StorageQuotaChecker quotaChecker =
new StorageQuotaChecker(offlineTableConfig, tableSizeReader, _controllerMetrics, _pinotHelixResourceManager,
- _controllerLeadershipManager);
+ _leadControllerManager);
return quotaChecker.isSegmentStorageWithinQuota(segmentFile, metadata.getName(),
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index f40e2b3..9261420 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -29,6 +29,7 @@ import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.slf4j.Logger;
@@ -56,10 +57,11 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
* @param pinotHelixResourceManager The resource checker used to interact with Helix
* @param config The controller configuration object
*/
- public SegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
- ControllerMetrics controllerMetrics) {
+ public SegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager,
+ LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics) {
super("SegmentStatusChecker", config.getStatusCheckerFrequencyInSeconds(),
- config.getStatusCheckerInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
+ config.getStatusCheckerInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager,
+ controllerMetrics);
_waitForPushTimeSeconds = config.getStatusCheckerWaitForPushTimeInSeconds();
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index a4d4d0b..e33ee62 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -53,7 +53,6 @@ import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.ZNRecord;
-import org.apache.helix.examples.MasterSlaveStateModelFactory;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
import org.apache.helix.model.CurrentState;
@@ -99,6 +98,7 @@ import org.apache.pinot.common.utils.helix.PinotHelixPropertyStoreZnRecordProvid
import org.apache.pinot.common.utils.retry.RetryPolicies;
import org.apache.pinot.common.utils.retry.RetryPolicy;
import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.api.pojos.Instance;
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategy;
@@ -106,6 +106,7 @@ import org.apache.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategy
import org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategy;
import org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategyEnum;
import org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategyFactory;
+import org.apache.pinot.controller.helix.core.statemodel.LeadControllerResourceMasterSlaveStateModelFactory;
import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
import org.apache.pinot.controller.helix.starter.HelixConfig;
import org.apache.pinot.core.realtime.stream.StreamConfig;
@@ -144,6 +145,7 @@ public class PinotHelixResourceManager {
private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
private RebalanceSegmentStrategyFactory _rebalanceSegmentStrategyFactory;
private TableRebalancer _tableRebalancer;
+ private LeadControllerManager _leadControllerManager;
public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String helixClusterName,
@Nonnull String controllerInstanceId, String dataDir, long externalViewOnlineToOfflineTimeoutMillis,
@@ -170,6 +172,7 @@ public class PinotHelixResourceManager {
* Create Helix cluster if needed, and then start a Pinot controller instance.
*/
public synchronized void start() {
+ _leadControllerManager = new LeadControllerManager();
_helixZkManager = registerAndConnectAsHelixParticipant();
_helixAdmin = _helixZkManager.getClusterManagmentTool();
_propertyStore = _helixZkManager.getHelixPropertyStore();
@@ -255,6 +258,16 @@ public class PinotHelixResourceManager {
return _propertyStore;
}
+
+ /**
+ * Get lead controller manager.
+ *
+ * @return lead controller manager
+ */
+ public LeadControllerManager getLeadControllerManager() {
+ return _leadControllerManager;
+ }
+
/**
* Register and connect to Helix cluster as PARTICIPANT role.
*/
@@ -263,8 +276,8 @@ public class PinotHelixResourceManager {
HelixManagerFactory.getZKHelixManager(_helixClusterName, _instanceId, InstanceType.PARTICIPANT, _helixZkURL);
// Registers Master-Slave state model to state machine engine, which is for calculating participant assignment in lead controller resource.
- helixManager.getStateMachineEngine()
- .registerStateModelFactory(MasterSlaveSMD.name, new MasterSlaveStateModelFactory());
+ helixManager.getStateMachineEngine().registerStateModelFactory(MasterSlaveSMD.name,
+ new LeadControllerResourceMasterSlaveStateModelFactory(_leadControllerManager));
try {
helixManager.connect();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 7a09c00..649b966 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -29,6 +29,7 @@ import org.apache.pinot.common.config.TableTaskConfig;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegistry;
@@ -51,10 +52,11 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
private final TaskGeneratorRegistry _taskGeneratorRegistry;
public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
- PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
- ControllerMetrics controllerMetrics) {
+ PinotHelixResourceManager helixResourceManager, LeadControllerManager leadControllerManager,
+ ControllerConf controllerConf, ControllerMetrics controllerMetrics) {
super("PinotTaskManager", controllerConf.getTaskManagerFrequencyInSeconds(),
- controllerConf.getPinotTaskManagerInitialDelaySeconds(), helixResourceManager, controllerMetrics);
+ controllerConf.getPinotTaskManagerInitialDelaySeconds(), helixResourceManager, leadControllerManager,
+ controllerMetrics);
_helixTaskResourceManager = helixTaskResourceManager;
_clusterInfoProvider = new ClusterInfoProvider(helixResourceManager, helixTaskResourceManager, controllerConf);
_taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoProvider);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 7e764f3..76d106f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -23,6 +23,7 @@ import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.core.periodictask.BasePeriodicTask;
import org.slf4j.Logger;
@@ -40,12 +41,15 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
private static final Logger LOGGER = LoggerFactory.getLogger(ControllerPeriodicTask.class);
protected final PinotHelixResourceManager _pinotHelixResourceManager;
+ protected final LeadControllerManager _leadControllerManager;
protected final ControllerMetrics _controllerMetrics;
public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds, long initialDelayInSeconds,
- PinotHelixResourceManager pinotHelixResourceManager, ControllerMetrics controllerMetrics) {
+ PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager,
+ ControllerMetrics controllerMetrics) {
super(taskName, runFrequencyInSeconds, initialDelayInSeconds);
_pinotHelixResourceManager = pinotHelixResourceManager;
+ _leadControllerManager = leadControllerManager;
_controllerMetrics = controllerMetrics;
}
@@ -75,6 +79,9 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
LOGGER.info("Task: {} is stopped, early terminate the task", _taskName);
break;
}
+ if (!_leadControllerManager.isLeaderForTable(tableNameWithType)) {
+ continue;
+ }
try {
processTable(tableNameWithType, context);
} catch (Exception e) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java
index 8a2b63c..858bfca 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java
@@ -19,7 +19,7 @@
package org.apache.pinot.controller.helix.core.periodictask;
import java.util.List;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.LeadershipChangeSubscriber;
import org.apache.pinot.core.periodictask.PeriodicTask;
import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
@@ -39,11 +39,11 @@ public class ControllerPeriodicTaskScheduler extends PeriodicTaskScheduler imple
* Initialize the {@link ControllerPeriodicTaskScheduler} with the list of {@link ControllerPeriodicTask} created at startup
* This is called only once during controller startup
* @param controllerPeriodicTasks
- * @param controllerLeadershipManager
+ * @param leadControllerManager
*/
- public void init(List<PeriodicTask> controllerPeriodicTasks, ControllerLeadershipManager controllerLeadershipManager) {
+ public void init(List<PeriodicTask> controllerPeriodicTasks, LeadControllerManager leadControllerManager) {
super.init(controllerPeriodicTasks);
- controllerLeadershipManager.subscribe(ControllerPeriodicTaskScheduler.class.getName(), this);
+ leadControllerManager.subscribe(ControllerPeriodicTaskScheduler.class.getName(), this);
}
@Override
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index c833261..0982c30 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -66,7 +66,7 @@ import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.common.utils.retry.RetryPolicies;
import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
@@ -123,13 +123,13 @@ public class PinotLLCRealtimeSegmentManager {
private final TableConfigCache _tableConfigCache;
private final StreamPartitionAssignmentGenerator _streamPartitionAssignmentGenerator;
private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
- private final ControllerLeadershipManager _controllerLeadershipManager;
+ private final LeadControllerManager _leadControllerManager;
private volatile boolean _isStopping = false;
private AtomicInteger _numCompletingSegments = new AtomicInteger(0);
public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
- ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
+ ControllerMetrics controllerMetrics, LeadControllerManager leadControllerManager) {
_helixAdmin = helixResourceManager.getHelixAdmin();
_helixManager = helixResourceManager.getHelixZkManager();
_propertyStore = helixResourceManager.getPropertyStore();
@@ -145,7 +145,7 @@ public class PinotLLCRealtimeSegmentManager {
_tableConfigCache = new TableConfigCache(_propertyStore);
_streamPartitionAssignmentGenerator = new StreamPartitionAssignmentGenerator(_helixManager);
_flushThresholdUpdateManager = new FlushThresholdUpdateManager();
- _controllerLeadershipManager = controllerLeadershipManager;
+ _leadControllerManager = leadControllerManager;
}
@@ -182,8 +182,8 @@ public class PinotLLCRealtimeSegmentManager {
LOGGER.info("Wait completed: Number of completing segments = {}", _numCompletingSegments.get());
}
- protected boolean isLeader() {
- return _controllerLeadershipManager.isLeader();
+ protected boolean isLeader(String tableName) {
+ return _leadControllerManager.isLeaderForTable(tableName);
}
protected boolean isConnected() {
@@ -341,10 +341,10 @@ public class PinotLLCRealtimeSegmentManager {
URI uriToMoveTo = ControllerConf.getUriFromPath(StringUtil.join("/", tableDirURI.toString(), segmentName));
PinotFS pinotFS = PinotFSFactory.create(baseDirURI.getScheme());
- if (!isConnected() || !isLeader()) {
+ if (!isConnected() || !isLeader(tableName)) {
// We can potentially log a different value than what we saw ....
LOGGER.warn("Lost leadership while committing segment file {}, {} for table {}: isLeader={}, isConnected={}",
- segmentName, segmentLocation, tableName, isLeader(), isConnected());
+ segmentName, segmentLocation, tableName, isLeader(tableName), isConnected());
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
return false;
}
@@ -540,17 +540,17 @@ public class PinotLLCRealtimeSegmentManager {
final String oldZnodePath =
ZKMetadataProvider.constructPropertyStorePathForSegment(realtimeTableName, committingSegmentNameStr);
- if (!isConnected() || !isLeader()) {
+ if (!isConnected() || !isLeader(realtimeTableName)) {
// We can potentially log a different value than what we saw ....
LOGGER.warn("Lost leadership while committing segment metadata for {} for table {}: isLeader={}, isConnected={}",
- committingSegmentNameStr, realtimeTableName, isLeader(), isConnected());
+ committingSegmentNameStr, realtimeTableName, isLeader(realtimeTableName), isConnected());
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
return false;
}
boolean success = writeSegmentToPropertyStore(oldZnodePath, oldZnRecord, realtimeTableName, stat.getVersion());
if (!success) {
LOGGER.warn("Fail to write old segment to property store for {} for table {}: isLeader={}, isConnected={}",
- committingSegmentNameStr, realtimeTableName, isLeader(), isConnected());
+ committingSegmentNameStr, realtimeTableName, isLeader(realtimeTableName), isConnected());
}
return success;
}
@@ -599,11 +599,11 @@ public class PinotLLCRealtimeSegmentManager {
ZKMetadataProvider.constructPropertyStorePathForSegment(realtimeTableName, newSegmentNameStr);
if (!isNewTableSetup) {
- if (!isLeader() || !isConnected()) {
+ if (!isLeader(realtimeTableName) || !isConnected()) {
// We can potentially log a different value than what we saw ....
LOGGER.warn(
"Lost leadership while committing new segment metadata for {} for table {}: isLeader={}, isConnected={}",
- newSegmentNameStr, rawTableName, isLeader(), isConnected());
+ newSegmentNameStr, rawTableName, isLeader(realtimeTableName), isConnected());
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
return false;
}
@@ -612,7 +612,7 @@ public class PinotLLCRealtimeSegmentManager {
boolean success = writeSegmentToPropertyStore(newZnodePath, newZnRecord, realtimeTableName);
if (!success) {
LOGGER.warn("Fail to write new segment to property store for {} for table {}: isLeader={}, isConnected={}",
- newSegmentNameStr, rawTableName, isLeader(), isConnected());
+ newSegmentNameStr, rawTableName, isLeader(realtimeTableName), isConnected());
}
return success;
}
@@ -1347,8 +1347,4 @@ public class PinotLLCRealtimeSegmentManager {
return idealState;
}
-
- public ControllerLeadershipManager getControllerLeadershipManager() {
- return _controllerLeadershipManager;
- }
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index a4da6b4..8935c6b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -19,7 +19,6 @@
package org.apache.pinot.controller.helix.core.realtime;
import com.google.common.annotations.VisibleForTesting;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -29,7 +28,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.helix.HelixManager;
-import org.apache.helix.ZNRecord;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMeter;
@@ -37,12 +35,13 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.pinot.common.utils.SegmentName.SEPARATOR;
+
/**
* This is a singleton class in the controller that drives the state machines for segments that are in the
@@ -73,7 +72,7 @@ public class SegmentCompletionManager {
private final Map<String, Long> _commitTimeMap = new ConcurrentHashMap<>();
private final PinotLLCRealtimeSegmentManager _segmentManager;
private final ControllerMetrics _controllerMetrics;
- private final ControllerLeadershipManager _controllerLeadershipManager;
+ private final LeadControllerManager _leadControllerManager;
private final Lock[] _fsmLocks;
private static final int NUM_FSM_LOCKS = 20;
@@ -87,12 +86,12 @@ public class SegmentCompletionManager {
// TODO keep some history of past committed segments so that we can avoid looking up PROPERTYSTORE if some server comes in late.
public SegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager,
- ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager,
+ ControllerMetrics controllerMetrics, LeadControllerManager leadControllerManager,
int segmentCommitTimeoutSeconds) {
_helixManager = helixManager;
_segmentManager = segmentManager;
_controllerMetrics = controllerMetrics;
- _controllerLeadershipManager = controllerLeadershipManager;
+ _leadControllerManager = leadControllerManager;
SegmentCompletionProtocol
.setMaxSegmentCommitTimeMs(TimeUnit.MILLISECONDS.convert(segmentCommitTimeoutSeconds, TimeUnit.SECONDS));
_fsmLocks = new Lock[NUM_FSM_LOCKS];
@@ -163,11 +162,12 @@ public class SegmentCompletionManager {
* that it currently has (i.e. next offset that it will consume, if it continues to consume).
*/
public SegmentCompletionProtocol.Response segmentConsumed(SegmentCompletionProtocol.Request.Params reqParams) {
- if (!isLeader() || !_helixManager.isConnected()) {
+ final String segmentNameStr = reqParams.getSegmentName();
+ final String tableName = segmentNameStr.split(SEPARATOR)[0];
+ if (!isLeader(tableName) || !_helixManager.isConnected()) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
return SegmentCompletionProtocol.RESP_NOT_LEADER;
}
- final String segmentNameStr = reqParams.getSegmentName();
final String instanceId = reqParams.getInstanceId();
final String stopReason = reqParams.getReason();
final long offset = reqParams.getOffset();
@@ -201,11 +201,12 @@ public class SegmentCompletionManager {
*/
public SegmentCompletionProtocol.Response segmentCommitStart(
final SegmentCompletionProtocol.Request.Params reqParams) {
- if (!isLeader() || !_helixManager.isConnected()) {
+ final String segmentNameStr = reqParams.getSegmentName();
+ final String tableName = segmentNameStr.split(SEPARATOR)[0];
+ if (!isLeader(tableName) || !_helixManager.isConnected()) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
return SegmentCompletionProtocol.RESP_NOT_LEADER;
}
- final String segmentNameStr = reqParams.getSegmentName();
final String instanceId = reqParams.getInstanceId();
final long offset = reqParams.getOffset();
LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
@@ -225,11 +226,12 @@ public class SegmentCompletionManager {
}
public SegmentCompletionProtocol.Response extendBuildTime(final SegmentCompletionProtocol.Request.Params reqParams) {
- if (!isLeader() || !_helixManager.isConnected()) {
+ final String segmentNameStr = reqParams.getSegmentName();
+ final String tableName = segmentNameStr.split(SEPARATOR)[0];
+ if (!isLeader(tableName) || !_helixManager.isConnected()) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
return SegmentCompletionProtocol.RESP_NOT_LEADER;
}
- final String segmentNameStr = reqParams.getSegmentName();
final String instanceId = reqParams.getInstanceId();
final long offset = reqParams.getOffset();
final int extTimeSec = reqParams.getExtraTimeSec();
@@ -256,11 +258,12 @@ public class SegmentCompletionManager {
*/
public SegmentCompletionProtocol.Response segmentStoppedConsuming(
SegmentCompletionProtocol.Request.Params reqParams) {
- if (!isLeader() || !_helixManager.isConnected()) {
+ final String segmentNameStr = reqParams.getSegmentName();
+ final String tableName = segmentNameStr.split(SEPARATOR)[0];
+ if (!isLeader(tableName) || !_helixManager.isConnected()) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
return SegmentCompletionProtocol.RESP_NOT_LEADER;
}
- final String segmentNameStr = reqParams.getSegmentName();
final String instanceId = reqParams.getInstanceId();
final long offset = reqParams.getOffset();
final String reason = reqParams.getReason();
@@ -292,11 +295,12 @@ public class SegmentCompletionManager {
*/
public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams,
boolean success, boolean isSplitCommit, CommittingSegmentDescriptor committingSegmentDescriptor) {
- if (!isLeader() || !_helixManager.isConnected()) {
+ final String segmentNameStr = reqParams.getSegmentName();
+ final String tableName = segmentNameStr.split(SEPARATOR)[0];
+ if (!isLeader(tableName) || !_helixManager.isConnected()) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
return SegmentCompletionProtocol.RESP_NOT_LEADER;
}
- final String segmentNameStr = reqParams.getSegmentName();
LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
SegmentCompletionFSM fsm = null;
SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.RESP_FAILED;
@@ -352,6 +356,7 @@ public class SegmentCompletionManager {
State _state = State.HOLDING; // Typically start off in HOLDING state.
final long _startTimeMs;
private final LLCSegmentName _segmentName;
+ private final String _realtimeTableName;
private final int _numReplicas;
private final Set<String> _excludedServerStateMap;
private final Map<String, Long> _commitStateMap;
@@ -394,6 +399,7 @@ public class SegmentCompletionManager {
private SegmentCompletionFSM(PinotLLCRealtimeSegmentManager segmentManager,
SegmentCompletionManager segmentCompletionManager, LLCSegmentName segmentName, int numReplicas) {
_segmentName = segmentName;
+ _realtimeTableName = _segmentName.getTableName();
_numReplicas = numReplicas;
_segmentManager = segmentManager;
_commitStateMap = new HashMap<>(_numReplicas);
@@ -403,8 +409,8 @@ public class SegmentCompletionManager {
_maxTimeToPickWinnerMs = _startTimeMs + MAX_TIME_TO_PICK_WINNER_MS;
_maxTimeToNotifyWinnerMs = _startTimeMs + MAX_TIME_TO_NOTIFY_WINNER_MS;
long initialCommitTimeMs =
- MAX_TIME_TO_NOTIFY_WINNER_MS + _segmentManager.getCommitTimeoutMS(_segmentName.getTableName());
- Long savedCommitTime = _segmentCompletionManager._commitTimeMap.get(segmentName.getTableName());
+ MAX_TIME_TO_NOTIFY_WINNER_MS + _segmentManager.getCommitTimeoutMS(_realtimeTableName);
+ Long savedCommitTime = _segmentCompletionManager._commitTimeMap.get(_realtimeTableName);
if (savedCommitTime != null && savedCommitTime > initialCommitTimeMs) {
initialCommitTimeMs = savedCommitTime;
}
@@ -413,7 +419,7 @@ public class SegmentCompletionManager {
// The table has a really high value configured for max commit time. Set it to a higher value than default
// and go from there.
LOGGER.info("Configured max commit time {}s too high for table {}, changing to {}s", initialCommitTimeMs / 1000,
- segmentName.getTableName(), MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS);
+ _realtimeTableName, MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS);
initialCommitTimeMs = MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS * 1000;
}
_initialCommitTimeMs = initialCommitTimeMs;
@@ -681,14 +687,14 @@ public class SegmentCompletionManager {
private SegmentCompletionProtocol.Response abortAndReturnHold(long now, String instanceId, long offset) {
_state = State.ABORTED;
_segmentCompletionManager._controllerMetrics
- .addMeteredTableValue(_segmentName.getTableName(), ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
+ .addMeteredTableValue(_realtimeTableName, ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
return hold(instanceId, offset);
}
private SegmentCompletionProtocol.Response abortAndReturnFailed() {
_state = State.ABORTED;
_segmentCompletionManager._controllerMetrics
- .addMeteredTableValue(_segmentName.getTableName(), ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
+ .addMeteredTableValue(_realtimeTableName, ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
return SegmentCompletionProtocol.RESP_FAILED;
}
@@ -1117,7 +1123,7 @@ public class SegmentCompletionManager {
}
@VisibleForTesting
- protected boolean isLeader() {
- return _controllerLeadershipManager.isLeader();
+ protected boolean isLeader(String tableName) {
+ return _leadControllerManager.isLeaderForTable(tableName);
}
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index ea5b05b..2c2c017 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -38,6 +38,7 @@ import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.common.utils.retry.RetryPolicies;
import org.apache.pinot.common.utils.time.TimeUtils;
import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
@@ -55,10 +56,11 @@ import org.slf4j.LoggerFactory;
public class RealtimeSegmentRelocator extends ControllerPeriodicTask<Void> {
private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeSegmentRelocator.class);
- public RealtimeSegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
- ControllerMetrics controllerMetrics) {
+ public RealtimeSegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager,
+ LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics) {
super("RealtimeSegmentRelocator", getRunFrequencySeconds(config.getRealtimeSegmentRelocatorFrequency()),
- config.getRealtimeSegmentRelocationInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
+ config.getRealtimeSegmentRelocationInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager,
+ controllerMetrics);
}
@Override
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 7d5182d..fecb4ac 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -35,6 +35,7 @@ import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
import org.apache.pinot.common.utils.SegmentName;
import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.apache.pinot.controller.helix.core.retention.strategy.RetentionStrategy;
@@ -54,10 +55,11 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
private final int _deletedSegmentsRetentionInDays;
- public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
- ControllerMetrics controllerMetrics) {
+ public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager,
+ LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics) {
super("RetentionManager", config.getRetentionControllerFrequencyInSeconds(),
- config.getRetentionManagerInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
+ config.getRetentionManagerInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager,
+ controllerMetrics);
_deletedSegmentsRetentionInDays = config.getDeletedSegmentsRetentionInDays();
LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}, deletedSegmentsRetentionInDays: {}",
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerChecker.java
new file mode 100644
index 0000000..69b56a3
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerChecker.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.statemodel;
+
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pinot.common.utils.CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE;
+
+
+public class LeadControllerChecker {
+ private static final Logger LOGGER = LoggerFactory.getLogger(LeadControllerChecker.class);
+
+ private Map<Integer, String> _partitionCache;
+
+ public LeadControllerChecker() {
+ _partitionCache = new ConcurrentHashMap<>();
+ }
+
+ public void addPartitionLeader(String partitionName) {
+ LOGGER.info("Add Partition: {} to LeadControllerChecker", partitionName);
+ int partitionIndex = Integer.parseInt(partitionName.substring(partitionName.lastIndexOf("_") + 1));
+ _partitionCache.putIfAbsent(partitionIndex, null);
+ }
+
+ public void removePartitionLeader(String partitionName) {
+ LOGGER.info("Remove Partition: {} from LeadControllerChecker", partitionName);
+ int partitionIndex = Integer.parseInt(partitionName.substring(partitionName.lastIndexOf("_") + 1));
+ _partitionCache.remove(partitionIndex);
+ }
+
+ public boolean isPartitionLeader(int partitionIndex) {
+ Preconditions.checkArgument(partitionIndex >= 0 && partitionIndex < NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE, "Invalid partition index: " + partitionIndex);
+ return _partitionCache.containsKey(partitionIndex);
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerResourceMasterSlaveStateModelFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerResourceMasterSlaveStateModelFactory.java
new file mode 100644
index 0000000..d063c58
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerResourceMasterSlaveStateModelFactory.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.statemodel;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class LeadControllerResourceMasterSlaveStateModelFactory extends MasterSlaveStateModelFactory {
+ private static final Logger LOGGER = LoggerFactory.getLogger(LeadControllerResourceMasterSlaveStateModelFactory.class);
+
+ private final LeadControllerManager _leadControllerManager;
+
+ public LeadControllerResourceMasterSlaveStateModelFactory(LeadControllerManager leadControllerManager) {
+ super();
+ _leadControllerManager = leadControllerManager;
+ }
+
+ @Override
+ public StateModel createNewStateModel(String resourceName, String partitionName) {
+ MasterSlaveStateModel stateModel = new LeadControllerResourceMasterSlaveStateModel();
+ stateModel.setPartitionName(partitionName);
+ return stateModel;
+ }
+
+ public class LeadControllerResourceMasterSlaveStateModel extends MasterSlaveStateModel {
+ @Override
+ public void onBecomeSlaveFromMaster(Message message, NotificationContext context) {
+ super.onBecomeSlaveFromMaster(message, context);
+ String partitionName = message.getPartitionName();
+ _leadControllerManager.addPartitionLeader(partitionName);
+ _leadControllerManager.onLeadControllerChange();
+ }
+
+ @Override
+ public void onBecomeMasterFromSlave(Message message, NotificationContext context) {
+ super.onBecomeMasterFromSlave(message, context);
+ String partitionName = message.getPartitionName();
+ _leadControllerManager.removePartitionLeader(partitionName);
+ _leadControllerManager.onLeadControllerChange();
+ }
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
index 5748d3c..a82a161 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
@@ -24,6 +24,7 @@ import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.slf4j.Logger;
@@ -37,9 +38,9 @@ public class BrokerResourceValidationManager extends ControllerPeriodicTask<Brok
private static final Logger LOGGER = LoggerFactory.getLogger(BrokerResourceValidationManager.class);
public BrokerResourceValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
- ControllerMetrics controllerMetrics) {
+ LeadControllerManager leadControllerManager, ControllerMetrics controllerMetrics) {
super("BrokerResourceValidationManager", config.getBrokerResourceValidationFrequencyInSeconds(),
- config.getBrokerResourceValidationInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
+ config.getBrokerResourceValidationInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager, controllerMetrics);
}
@Override
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index 7f19395..0eeadc7 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -30,6 +30,7 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.metrics.ValidationMetrics;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.apache.pinot.controller.util.SegmentIntervalUtils;
@@ -50,9 +51,11 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask<Void>
private final ValidationMetrics _validationMetrics;
public OfflineSegmentIntervalChecker(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
- ValidationMetrics validationMetrics, ControllerMetrics controllerMetrics) {
+ LeadControllerManager leadControllerManager, ValidationMetrics validationMetrics,
+ ControllerMetrics controllerMetrics) {
super("OfflineSegmentIntervalChecker", config.getOfflineSegmentIntervalCheckerFrequencyInSeconds(),
- config.getOfflineSegmentIntervalCheckerInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
+ config.getOfflineSegmentIntervalCheckerInitialDelayInSeconds(), pinotHelixResourceManager,
+ leadControllerManager, controllerMetrics);
_validationMetrics = validationMetrics;
}
@@ -88,12 +91,12 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask<Void>
if (SegmentIntervalUtils.isValidInterval(timeInterval)) {
segmentIntervals.add(timeInterval);
} else {
- numSegmentsWithInvalidIntervals ++;
+ numSegmentsWithInvalidIntervals++;
}
}
if (numSegmentsWithInvalidIntervals > 0) {
- LOGGER.warn("Table: {} has {} segments with invalid interval", offlineTableName,
- numSegmentsWithInvalidIntervals);
+ LOGGER
+ .warn("Table: {} has {} segments with invalid interval", offlineTableName, numSegmentsWithInvalidIntervals);
}
Duration frequency = SegmentIntervalUtils.convertToDuration(validationConfig.getSegmentPushFrequency());
numMissingSegments = computeNumMissingSegments(segmentIntervals, frequency);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 5eb5a6f..88ad642 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -32,6 +32,7 @@ import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.HLCSegmentName;
import org.apache.pinot.common.utils.SegmentName;
import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
@@ -53,10 +54,11 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea
private long _lastUpdateRealtimeDocumentCountTimeMs = 0L;
public RealtimeSegmentValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
- PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, ValidationMetrics validationMetrics,
- ControllerMetrics controllerMetrics) {
+ LeadControllerManager leadControllerManager, PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
+ ValidationMetrics validationMetrics, ControllerMetrics controllerMetrics) {
super("RealtimeSegmentValidationManager", config.getRealtimeSegmentValidationFrequencyInSeconds(),
- config.getRealtimeSegmentValidationManagerInitialDelaySeconds(), pinotHelixResourceManager, controllerMetrics);
+ config.getRealtimeSegmentValidationManagerInitialDelaySeconds(), pinotHelixResourceManager,
+ leadControllerManager, controllerMetrics);
_llcRealtimeSegmentManager = llcRealtimeSegmentManager;
_validationMetrics = validationMetrics;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
index db23301..c78c756 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
@@ -30,7 +30,7 @@ import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.utils.DataSize;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.util.TableSizeReader;
import org.slf4j.Logger;
@@ -48,16 +48,16 @@ public class StorageQuotaChecker {
private final TableConfig _tableConfig;
private final ControllerMetrics _controllerMetrics;
private final PinotHelixResourceManager _pinotHelixResourceManager;
- private final ControllerLeadershipManager _controllerLeadershipManager;
+ private final LeadControllerManager _leadControllerManager;
public StorageQuotaChecker(TableConfig tableConfig, TableSizeReader tableSizeReader,
ControllerMetrics controllerMetrics, PinotHelixResourceManager pinotHelixResourceManager,
- ControllerLeadershipManager controllerLeadershipManager) {
+ LeadControllerManager leadControllerManager) {
_tableConfig = tableConfig;
_tableSizeReader = tableSizeReader;
_controllerMetrics = controllerMetrics;
_pinotHelixResourceManager = pinotHelixResourceManager;
- _controllerLeadershipManager = controllerLeadershipManager;
+ _leadControllerManager = leadControllerManager;
}
public static class QuotaCheckerResponse {
@@ -157,7 +157,7 @@ public class StorageQuotaChecker {
tableNameWithType, tableSubtypeSize.estimatedSizeInBytes, tableSubtypeSize.reportedSizeInBytes);
// Only emit the real percentage of storage quota usage by lead controller, otherwise emit 0L.
- if (isLeader() && allowedStorageBytes != 0L) {
+ if (isLeader(tableNameWithType) && allowedStorageBytes != 0L) {
long existingStorageQuotaUtilization = tableSubtypeSize.estimatedSizeInBytes * 100 / allowedStorageBytes;
_controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_STORAGE_QUOTA_UTILIZATION,
existingStorageQuotaUtilization);
@@ -213,7 +213,7 @@ public class StorageQuotaChecker {
}
}
- protected boolean isLeader() {
- return _controllerLeadershipManager.isLeader();
+ protected boolean isLeader(String tableName) {
+ return _leadControllerManager.isLeaderForTable(tableName);
}
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
index c235483..066040e 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -35,10 +35,12 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.testng.Assert;
import org.testng.annotations.Test;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -46,6 +48,7 @@ import static org.mockito.Mockito.when;
public class SegmentStatusCheckerTest {
private SegmentStatusChecker segmentStatusChecker;
private PinotHelixResourceManager helixResourceManager;
+ private LeadControllerManager leadControllerManager;
private MetricsRegistry metricsRegistry;
private ControllerMetrics controllerMetrics;
private ControllerConf config;
@@ -84,9 +87,14 @@ public class SegmentStatusCheckerTest {
when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
}
+ {
+ leadControllerManager = mock(LeadControllerManager.class);
+ when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+ }
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+ segmentStatusChecker =
+ new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
segmentStatusChecker.start();
segmentStatusChecker.run();
Assert.assertEquals(
@@ -146,9 +154,14 @@ public class SegmentStatusCheckerTest {
when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
}
+ {
+ leadControllerManager = mock(LeadControllerManager.class);
+ when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+ }
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+ segmentStatusChecker =
+ new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
segmentStatusChecker.start();
segmentStatusChecker.run();
Assert.assertEquals(
@@ -222,9 +235,14 @@ public class SegmentStatusCheckerTest {
when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(0);
}
+ {
+ leadControllerManager = mock(LeadControllerManager.class);
+ when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+ }
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+ segmentStatusChecker =
+ new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
segmentStatusChecker.start();
segmentStatusChecker.run();
Assert.assertEquals(
@@ -264,9 +282,14 @@ public class SegmentStatusCheckerTest {
when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
}
+ {
+ leadControllerManager = mock(LeadControllerManager.class);
+ when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+ }
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+ segmentStatusChecker =
+ new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
segmentStatusChecker.start();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
@@ -291,9 +314,14 @@ public class SegmentStatusCheckerTest {
when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
}
+ {
+ leadControllerManager = mock(LeadControllerManager.class);
+ when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+ }
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+ segmentStatusChecker =
+ new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
segmentStatusChecker.start();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE),
@@ -349,9 +377,14 @@ public class SegmentStatusCheckerTest {
when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
}
+ {
+ leadControllerManager = mock(LeadControllerManager.class);
+ when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+ }
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+ segmentStatusChecker =
+ new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
segmentStatusChecker.start();
segmentStatusChecker.run();
Assert.assertEquals(
@@ -390,9 +423,14 @@ public class SegmentStatusCheckerTest {
when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
}
+ {
+ leadControllerManager = mock(LeadControllerManager.class);
+ when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+ }
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+ segmentStatusChecker =
+ new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
segmentStatusChecker.start();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
@@ -429,9 +467,14 @@ public class SegmentStatusCheckerTest {
when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
}
+ {
+ leadControllerManager = mock(LeadControllerManager.class);
+ when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+ }
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+ segmentStatusChecker =
+ new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
// verify state before test
Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT), 0);
// update metrics
@@ -463,9 +506,14 @@ public class SegmentStatusCheckerTest {
when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
}
+ {
+ leadControllerManager = mock(LeadControllerManager.class);
+ when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+ }
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+ segmentStatusChecker =
+ new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
// verify state before test
Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT), 0);
// update metrics
@@ -508,9 +556,14 @@ public class SegmentStatusCheckerTest {
when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
}
+ {
+ leadControllerManager = mock(LeadControllerManager.class);
+ when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+ }
metricsRegistry = new MetricsRegistry();
controllerMetrics = new ControllerMetrics(metricsRegistry);
- segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+ segmentStatusChecker =
+ new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
segmentStatusChecker.start();
segmentStatusChecker.run();
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE),
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index a928922..dcd5469 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -27,10 +27,12 @@ import java.util.stream.IntStream;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
@@ -43,6 +45,7 @@ public class ControllerPeriodicTaskTest {
private final ControllerConf _controllerConf = new ControllerConf();
private final PinotHelixResourceManager _resourceManager = mock(PinotHelixResourceManager.class);
+ private final LeadControllerManager _leadControllerManager = mock(LeadControllerManager.class);
private final ControllerMetrics _controllerMetrics = new ControllerMetrics(new MetricsRegistry());
private final AtomicBoolean _startTaskCalled = new AtomicBoolean();
private final AtomicBoolean _stopTaskCalled = new AtomicBoolean();
@@ -52,7 +55,8 @@ public class ControllerPeriodicTaskTest {
private static final String TASK_NAME = "TestTask";
private final ControllerPeriodicTask _task = new ControllerPeriodicTask<Void>(TASK_NAME, RUN_FREQUENCY_IN_SECONDS,
- _controllerConf.getPeriodicTaskInitialDelayInSeconds(), _resourceManager, _controllerMetrics) {
+ _controllerConf.getPeriodicTaskInitialDelayInSeconds(), _resourceManager, _leadControllerManager,
+ _controllerMetrics) {
@Override
protected void setUpTask() {
@@ -81,6 +85,7 @@ public class ControllerPeriodicTaskTest {
List<String> tables = new ArrayList<>(_numTables);
IntStream.range(0, _numTables).forEach(i -> tables.add("table_" + i + " _OFFLINE"));
when(_resourceManager.getAllTables()).thenReturn(tables);
+ when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
}
private void resetState() {
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 04e2b7b..af35420 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -56,7 +56,7 @@ import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.api.resources.LLCSegmentCompletionHandlers;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
@@ -66,7 +66,6 @@ import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.core.realtime.stream.OffsetCriteria;
import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
import org.apache.pinot.filesystem.PinotFSFactory;
import org.apache.zookeeper.data.Stat;
@@ -1323,7 +1322,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
protected FakePinotLLCRealtimeSegmentManager(PinotHelixResourceManager pinotHelixResourceManager,
List<String> existingLLCSegments, ControllerMetrics controllerMetrics) {
super(pinotHelixResourceManager, CONTROLLER_CONF, controllerMetrics,
- new ControllerLeadershipManager(pinotHelixResourceManager.getHelixZkManager(), controllerMetrics));
+ new LeadControllerManager());
try {
TableConfigCache mockCache = mock(TableConfigCache.class);
@@ -1513,7 +1512,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
}
@Override
- protected boolean isLeader() {
+ protected boolean isLeader(String tableName) {
return IS_LEADER;
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index b05f9bf..22e317f 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -30,7 +30,7 @@ import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.apache.zookeeper.data.Stat;
@@ -40,7 +40,6 @@ import org.testng.annotations.Test;
import static org.apache.pinot.common.protocols.SegmentCompletionProtocol.ControllerResponseStatus;
import static org.apache.pinot.common.protocols.SegmentCompletionProtocol.Request;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -1151,8 +1150,7 @@ public class SegmentCompletionTest {
protected MockPinotLLCRealtimeSegmentManager(PinotHelixResourceManager pinotHelixResourceManager,
ControllerMetrics controllerMetrics) {
- super(pinotHelixResourceManager, CONTROLLER_CONF, controllerMetrics,
- new ControllerLeadershipManager(pinotHelixResourceManager.getHelixZkManager(), controllerMetrics));
+ super(pinotHelixResourceManager, CONTROLLER_CONF, controllerMetrics, new LeadControllerManager());
}
@Override
@@ -1210,7 +1208,7 @@ public class SegmentCompletionTest {
protected MockSegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager,
boolean isLeader, ControllerMetrics controllerMetrics) {
super(helixManager, segmentManager, controllerMetrics,
- new ControllerLeadershipManager(helixManager, controllerMetrics),
+ new LeadControllerManager(),
SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds());
_isLeader = isLeader;
}
@@ -1221,7 +1219,7 @@ public class SegmentCompletionTest {
}
@Override
- protected boolean isLeader() {
+ protected boolean isLeader(String tableName) {
return _isLeader;
}
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
index 8c5c8ec..9316062 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
@@ -34,12 +34,14 @@ import org.apache.helix.model.builder.CustomModeISBuilder;
import org.apache.pinot.common.config.RealtimeTagConfig;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -48,6 +50,7 @@ public class RealtimeSegmentRelocatorTest {
private TestRealtimeSegmentRelocator _realtimeSegmentRelocator;
private HelixManager _mockHelixManager;
+ private LeadControllerManager _leadControllerManager;
private String[] serverNames;
private String[] consumingServerNames;
@@ -69,10 +72,13 @@ public class RealtimeSegmentRelocatorTest {
PinotHelixResourceManager mockPinotHelixResourceManager = mock(PinotHelixResourceManager.class);
_mockHelixManager = mock(HelixManager.class);
when(mockPinotHelixResourceManager.getHelixZkManager()).thenReturn(_mockHelixManager);
+ LeadControllerManager mockLeadControllerManager = mock(LeadControllerManager.class);
+ when(mockLeadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
ControllerConf controllerConfig = new ControllerConf();
ControllerMetrics controllerMetrics = new ControllerMetrics(new MetricsRegistry());
_realtimeSegmentRelocator =
- new TestRealtimeSegmentRelocator(mockPinotHelixResourceManager, controllerConfig, controllerMetrics);
+ new TestRealtimeSegmentRelocator(mockPinotHelixResourceManager, mockLeadControllerManager, controllerConfig,
+ controllerMetrics);
final int maxInstances = 20;
serverNames = new String[maxInstances];
@@ -268,9 +274,9 @@ public class RealtimeSegmentRelocatorTest {
private Map<String, List<String>> tagToInstances;
- public TestRealtimeSegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
- ControllerMetrics controllerMetrics) {
- super(pinotHelixResourceManager, config, controllerMetrics);
+ public TestRealtimeSegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager,
+ LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics) {
+ super(pinotHelixResourceManager, leadControllerManager, config, controllerMetrics);
tagToInstances = new HashedMap();
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index d4adfe1..465a9af 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -35,6 +35,7 @@ import org.apache.pinot.common.segment.SegmentMetadata;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
import org.apache.pinot.controller.helix.core.SegmentDeletionManager;
@@ -84,6 +85,9 @@ public class RetentionManagerTest {
PinotHelixResourceManager pinotHelixResourceManager = mock(PinotHelixResourceManager.class);
setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager);
+ LeadControllerManager leadControllerManager = mock(LeadControllerManager.class);
+ when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+
when(pinotHelixResourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
when(pinotHelixResourceManager.getOfflineSegmentMetadata(OFFLINE_TABLE_NAME)).thenReturn(metadataList);
@@ -91,7 +95,8 @@ public class RetentionManagerTest {
ControllerMetrics controllerMetrics = new ControllerMetrics(new MetricsRegistry());
conf.setRetentionControllerFrequencyInSeconds(0);
conf.setDeletedSegmentsRetentionInDays(0);
- RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, conf, controllerMetrics);
+ RetentionManager retentionManager =
+ new RetentionManager(pinotHelixResourceManager, leadControllerManager, conf, controllerMetrics);
retentionManager.start();
retentionManager.run();
@@ -210,11 +215,14 @@ public class RetentionManagerTest {
setupSegmentMetadata(tableConfig, now, initialNumSegments, removedSegments);
setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager);
+ LeadControllerManager leadControllerManager = mock(LeadControllerManager.class);
+ when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+
ControllerConf conf = new ControllerConf();
ControllerMetrics controllerMetrics = new ControllerMetrics(new MetricsRegistry());
conf.setRetentionControllerFrequencyInSeconds(0);
conf.setDeletedSegmentsRetentionInDays(0);
- RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, conf, controllerMetrics);
+ RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, leadControllerManager, conf, controllerMetrics);
retentionManager.start();
retentionManager.run();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
index e84a947..f8fb6ca 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
@@ -30,7 +30,7 @@ import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.util.TableSizeReader;
import org.mockito.invocation.InvocationOnMock;
@@ -49,7 +49,7 @@ public class StorageQuotaCheckerTest {
private TableConfig _tableConfig;
private ControllerMetrics _controllerMetrics;
private PinotHelixResourceManager _pinotHelixResourceManager;
- private ControllerLeadershipManager _controllerLeadershipManager;
+ private LeadControllerManager _leadControllerManager;
private QuotaConfig _quotaConfig;
private SegmentsValidationAndRetentionConfig _validationConfig;
private static final File TEST_DIR = new File(StorageQuotaCheckerTest.class.getName());
@@ -62,7 +62,7 @@ public class StorageQuotaCheckerTest {
_controllerMetrics = new ControllerMetrics(new MetricsRegistry());
_validationConfig = mock(SegmentsValidationAndRetentionConfig.class);
_pinotHelixResourceManager = mock(PinotHelixResourceManager.class);
- _controllerLeadershipManager = mock(ControllerLeadershipManager.class);
+ _leadControllerManager = mock(LeadControllerManager.class);
when(_tableConfig.getValidationConfig()).thenReturn(_validationConfig);
when(_validationConfig.getReplicationNumber()).thenReturn(2);
TEST_DIR.mkdirs();
@@ -78,10 +78,9 @@ public class StorageQuotaCheckerTest {
throws InvalidConfigException {
StorageQuotaChecker checker =
new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager,
- _controllerLeadershipManager);
+ _leadControllerManager);
when(_tableConfig.getQuotaConfig()).thenReturn(null);
- StorageQuotaChecker.QuotaCheckerResponse res =
- checker.isSegmentStorageWithinQuota(TEST_DIR, "segment", 1000);
+ StorageQuotaChecker.QuotaCheckerResponse res = checker.isSegmentStorageWithinQuota(TEST_DIR, "segment", 1000);
Assert.assertTrue(res.isSegmentWithinQuota);
}
@@ -90,11 +89,10 @@ public class StorageQuotaCheckerTest {
throws InvalidConfigException {
StorageQuotaChecker checker =
new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager,
- _controllerLeadershipManager);
+ _leadControllerManager);
when(_tableConfig.getQuotaConfig()).thenReturn(_quotaConfig);
when(_quotaConfig.storageSizeBytes()).thenReturn(-1L);
- StorageQuotaChecker.QuotaCheckerResponse res =
- checker.isSegmentStorageWithinQuota(TEST_DIR, "segment", 1000);
+ StorageQuotaChecker.QuotaCheckerResponse res = checker.isSegmentStorageWithinQuota(TEST_DIR, "segment", 1000);
Assert.assertTrue(res.isSegmentWithinQuota);
}
@@ -134,9 +132,8 @@ public class StorageQuotaCheckerTest {
when(_quotaConfig.getStorage()).thenReturn("3K");
StorageQuotaChecker checker =
new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager,
- _controllerLeadershipManager);
- StorageQuotaChecker.QuotaCheckerResponse response =
- checker.isSegmentStorageWithinQuota(TEST_DIR, "segment1", 1000);
+ _leadControllerManager);
+ StorageQuotaChecker.QuotaCheckerResponse response = checker.isSegmentStorageWithinQuota(TEST_DIR, "segment1", 1000);
Assert.assertTrue(response.isSegmentWithinQuota);
Assert.assertEquals(
_controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.TABLE_STORAGE_QUOTA_UTILIZATION), 80L);
@@ -184,12 +181,12 @@ public class StorageQuotaCheckerTest {
public MockStorageQuotaChecker(TableConfig tableConfig, TableSizeReader tableSizeReader,
ControllerMetrics controllerMetrics, PinotHelixResourceManager pinotHelixResourceManager,
- ControllerLeadershipManager controllerLeadershipManager) {
- super(tableConfig, tableSizeReader, controllerMetrics, pinotHelixResourceManager, controllerLeadershipManager);
+ LeadControllerManager leadControllerManager) {
+ super(tableConfig, tableSizeReader, controllerMetrics, pinotHelixResourceManager, leadControllerManager);
}
@Override
- protected boolean isLeader() {
+ protected boolean isLeader(String tableName) {
return true;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org