You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/04/10 20:29:18 UTC
[incubator-pinot] branch master updated: Refactor
ControllerLeaderManager by using dependency injection instead of singleton
(#4054)
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e6f0341 Refactor ControllerLeaderManager by using dependency injection instead of singleton (#4054)
e6f0341 is described below
commit e6f03416d1c7ae3f630399081fec5c60a3da4aac
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Wed Apr 10 13:29:12 2019 -0700
Refactor ControllerLeaderManager by using dependency injection instead of singleton (#4054)
---
.../controller/ControllerLeadershipManager.java | 24 +-------
.../apache/pinot/controller/ControllerStarter.java | 14 +++--
.../PinotSegmentUploadRestletResource.java | 6 +-
.../controller/api/upload/SegmentValidator.java | 9 ++-
.../ControllerPeriodicTaskScheduler.java | 5 +-
.../realtime/PinotLLCRealtimeSegmentManager.java | 20 ++++---
.../core/realtime/PinotRealtimeSegmentManager.java | 8 ++-
.../core/realtime/SegmentCompletionManager.java | 10 ++--
.../controller/validation/StorageQuotaChecker.java | 7 ++-
.../PinotLLCRealtimeSegmentManagerTest.java | 65 ++++++++++++----------
.../helix/core/realtime/SegmentCompletionTest.java | 65 +++++++++++++---------
.../validation/StorageQuotaCheckerTest.java | 17 ++++--
12 files changed, 140 insertions(+), 110 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerLeadershipManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerLeadershipManager.java
index 6ffa655..46160c6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerLeadershipManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerLeadershipManager.java
@@ -42,34 +42,12 @@ public class ControllerLeadershipManager {
private Map<String, LeadershipChangeSubscriber> _subscribers = new ConcurrentHashMap<>();
- private ControllerLeadershipManager(HelixManager helixManager) {
+ public ControllerLeadershipManager(HelixManager helixManager) {
_helixManager = helixManager;
_helixManager.addControllerListener((ControllerChangeListener) notificationContext -> onControllerChange());
}
/**
- * Create an instance of ControllerLeadershipManager
- * @param helixManager
- */
- public static synchronized void init(HelixManager helixManager) {
- if (INSTANCE != null) {
- throw new RuntimeException("Instance of ControllerLeadershipManager already created");
- }
- INSTANCE = new ControllerLeadershipManager(helixManager);
- }
-
- /**
- * Get the instance of ControllerLeadershipManager
- * @return
- */
- public static synchronized ControllerLeadershipManager getInstance() {
- if (INSTANCE == null) {
- throw new RuntimeException("Instance of ControllerLeadershipManager not yet created");
- }
- return INSTANCE;
- }
-
- /**
* When stopping this service, if the controller is leader, invoke {@link ControllerLeadershipManager#onBecomingNonLeader()}
*/
public void stop() {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index 4051990..10c1c00 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -107,6 +107,7 @@ public class ControllerStarter {
private ControllerPeriodicTaskScheduler _controllerPeriodicTaskScheduler;
private PinotHelixTaskResourceManager _helixTaskResourceManager;
private PinotRealtimeSegmentManager _realtimeSegmentsManager;
+ private ControllerLeadershipManager _controllerLeadershipManager;
private List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbackList;
public ControllerStarter(ControllerConf conf) {
@@ -230,9 +231,9 @@ public class ControllerStarter {
// Note: Currently leadership depends on helix controller, thus assign helixControllerManager to ControllerLeadershipManager.
// TODO: In the future when Helix separation is completed, leadership only depends on the master in leadControllerResource, and ControllerLeadershipManager will be removed.
if (_helixControllerManager != null) {
- ControllerLeadershipManager.init(_helixControllerManager);
+ _controllerLeadershipManager = new ControllerLeadershipManager(_helixControllerManager);
} else {
- ControllerLeadershipManager.init(helixParticipantManager);
+ _controllerLeadershipManager = new ControllerLeadershipManager(helixParticipantManager);
}
LOGGER.info("Starting task resource manager");
@@ -240,15 +241,15 @@ public class ControllerStarter {
// Helix resource manager must be started in order to create PinotLLCRealtimeSegmentManager
LOGGER.info("Starting realtime segment manager");
- PinotLLCRealtimeSegmentManager.create(_helixResourceManager, _config, _controllerMetrics);
- _realtimeSegmentsManager = new PinotRealtimeSegmentManager(_helixResourceManager);
+ PinotLLCRealtimeSegmentManager.create(_helixResourceManager, _config, _controllerMetrics, _controllerLeadershipManager);
+ _realtimeSegmentsManager = new PinotRealtimeSegmentManager(_helixResourceManager, _controllerLeadershipManager);
_realtimeSegmentsManager.start(_controllerMetrics);
// Setting up periodic tasks
List<PeriodicTask> controllerPeriodicTasks = setupControllerPeriodicTasks();
LOGGER.info("Init controller periodic tasks scheduler");
_controllerPeriodicTaskScheduler = new ControllerPeriodicTaskScheduler();
- _controllerPeriodicTaskScheduler.init(controllerPeriodicTasks);
+ _controllerPeriodicTaskScheduler.init(controllerPeriodicTasks, _controllerLeadershipManager);
LOGGER.info("Creating rebalance segments factory");
RebalanceSegmentStrategyFactory.createInstance(helixParticipantManager);
@@ -284,6 +285,7 @@ public class ControllerStarter {
bind(_controllerMetrics).to(ControllerMetrics.class);
bind(accessControlFactory).to(AccessControlFactory.class);
bind(metadataEventNotifierFactory).to(MetadataEventNotifierFactory.class);
+ bind(_controllerLeadershipManager).to(ControllerLeadershipManager.class);
}
});
@@ -435,7 +437,7 @@ public class ControllerStarter {
try {
// Stopping ControllerLeadershipManager has to be done before stopping HelixResourceManager.
LOGGER.info("Stopping controller leadership manager");
- ControllerLeadershipManager.getInstance().stop();
+ _controllerLeadershipManager.stop();
// Stop PinotLLCSegmentManager before stopping Jersey API. It is possible that stopping Jersey API
// may interrupt the handlers waiting on an I/O.
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
index 0bbb4b0..4581d9d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
@@ -70,6 +70,7 @@ import org.apache.pinot.common.utils.JsonUtils;
import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.ControllerLeadershipManager;
import org.apache.pinot.controller.api.access.AccessControl;
import org.apache.pinot.controller.api.access.AccessControlFactory;
import org.apache.pinot.controller.api.upload.SegmentValidator;
@@ -115,6 +116,9 @@ public class PinotSegmentUploadRestletResource {
@Inject
AccessControlFactory _accessControlFactory;
+ @Inject
+ ControllerLeadershipManager _controllerLeadershipManager;
+
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/segments")
@@ -319,7 +323,7 @@ public class PinotSegmentUploadRestletResource {
// Validate segment
SegmentValidatorResponse segmentValidatorResponse =
new SegmentValidator(_pinotHelixResourceManager, _controllerConf, _executor, _connectionManager,
- _controllerMetrics).validateSegment(segmentMetadata, tempSegmentDir);
+ _controllerMetrics, _controllerLeadershipManager).validateSegment(segmentMetadata, tempSegmentDir);
// Zk operations
completeZkOperations(enableParallelPushProtection, headers, tempEncryptedFile, provider, segmentMetadata,
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
index a7dc32e..740cd22 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
@@ -34,6 +34,7 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.segment.SegmentMetadata;
import org.apache.pinot.common.utils.time.TimeUtils;
import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.ControllerLeadershipManager;
import org.apache.pinot.controller.api.resources.ControllerApplicationException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.util.TableSizeReader;
@@ -54,14 +55,17 @@ public class SegmentValidator {
private final Executor _executor;
private final HttpConnectionManager _connectionManager;
private final ControllerMetrics _controllerMetrics;
+ private final ControllerLeadershipManager _controllerLeadershipManager;
public SegmentValidator(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf controllerConf,
- Executor executor, HttpConnectionManager connectionManager, ControllerMetrics controllerMetrics) {
+ Executor executor, HttpConnectionManager connectionManager, ControllerMetrics controllerMetrics,
+ ControllerLeadershipManager controllerLeadershipManager) {
_pinotHelixResourceManager = pinotHelixResourceManager;
_controllerConf = controllerConf;
_executor = executor;
_connectionManager = connectionManager;
_controllerMetrics = controllerMetrics;
+ _controllerLeadershipManager = controllerLeadershipManager;
}
public SegmentValidatorResponse validateSegment(SegmentMetadata segmentMetadata, File tempSegmentDir) {
@@ -130,7 +134,8 @@ public class SegmentValidator {
TableSizeReader tableSizeReader =
new TableSizeReader(_executor, _connectionManager, _controllerMetrics, _pinotHelixResourceManager);
StorageQuotaChecker quotaChecker =
- new StorageQuotaChecker(offlineTableConfig, tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
+ new StorageQuotaChecker(offlineTableConfig, tableSizeReader, _controllerMetrics, _pinotHelixResourceManager,
+ _controllerLeadershipManager);
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(metadata.getTableName());
return quotaChecker.isSegmentStorageWithinQuota(segmentFile, offlineTableName, metadata.getName(),
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java
index c5ce341..8a2b63c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java
@@ -39,10 +39,11 @@ public class ControllerPeriodicTaskScheduler extends PeriodicTaskScheduler imple
* Initialize the {@link ControllerPeriodicTaskScheduler} with the list of {@link ControllerPeriodicTask} created at startup
* This is called only once during controller startup
* @param controllerPeriodicTasks
+ * @param controllerLeadershipManager
*/
- public void init(List<PeriodicTask> controllerPeriodicTasks) {
+ public void init(List<PeriodicTask> controllerPeriodicTasks, ControllerLeadershipManager controllerLeadershipManager) {
super.init(controllerPeriodicTasks);
- ControllerLeadershipManager.getInstance().subscribe(ControllerPeriodicTaskScheduler.class.getName(), this);
+ controllerLeadershipManager.subscribe(ControllerPeriodicTaskScheduler.class.getName(), this);
}
@Override
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index c26780b..324b9ae 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -126,6 +126,7 @@ public class PinotLLCRealtimeSegmentManager {
private final TableConfigCache _tableConfigCache;
private final StreamPartitionAssignmentGenerator _streamPartitionAssignmentGenerator;
private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
+ private final ControllerLeadershipManager _controllerLeadershipManager;
private volatile boolean _isStopping = false;
private AtomicInteger _numCompletingSegments = new AtomicInteger(0);
@@ -139,22 +140,22 @@ public class PinotLLCRealtimeSegmentManager {
}
public static synchronized void create(PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
- ControllerMetrics controllerMetrics) {
+ ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
create(helixResourceManager.getHelixAdmin(), helixResourceManager.getHelixClusterName(),
helixResourceManager.getHelixZkManager(), helixResourceManager.getPropertyStore(), helixResourceManager,
- controllerConf, controllerMetrics);
+ controllerConf, controllerMetrics, controllerLeadershipManager);
}
private static synchronized void create(HelixAdmin helixAdmin, String clusterName, HelixManager helixManager,
ZkHelixPropertyStore propertyStore, PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
- ControllerMetrics controllerMetrics) {
+ ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
if (INSTANCE != null) {
throw new RuntimeException("Instance already created");
}
INSTANCE =
new PinotLLCRealtimeSegmentManager(helixAdmin, clusterName, helixManager, propertyStore, helixResourceManager,
- controllerConf, controllerMetrics);
- SegmentCompletionManager.create(helixManager, INSTANCE, controllerConf, controllerMetrics);
+ controllerConf, controllerMetrics, controllerLeadershipManager);
+ SegmentCompletionManager.create(helixManager, INSTANCE, controllerConf, controllerMetrics, controllerLeadershipManager);
}
public void stop() {
@@ -186,7 +187,7 @@ public class PinotLLCRealtimeSegmentManager {
protected PinotLLCRealtimeSegmentManager(HelixAdmin helixAdmin, String clusterName, HelixManager helixManager,
ZkHelixPropertyStore propertyStore, PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
- ControllerMetrics controllerMetrics) {
+ ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
_helixAdmin = helixAdmin;
_helixManager = helixManager;
_propertyStore = propertyStore;
@@ -202,6 +203,7 @@ public class PinotLLCRealtimeSegmentManager {
_tableConfigCache = new TableConfigCache(_propertyStore);
_streamPartitionAssignmentGenerator = new StreamPartitionAssignmentGenerator(_helixManager);
_flushThresholdUpdateManager = new FlushThresholdUpdateManager();
+ _controllerLeadershipManager = controllerLeadershipManager;
}
public static PinotLLCRealtimeSegmentManager getInstance() {
@@ -212,7 +214,7 @@ public class PinotLLCRealtimeSegmentManager {
}
protected boolean isLeader() {
- return ControllerLeadershipManager.getInstance().isLeader();
+ return _controllerLeadershipManager.isLeader();
}
protected boolean isConnected() {
@@ -1376,4 +1378,8 @@ public class PinotLLCRealtimeSegmentManager {
return idealState;
}
+
+ public ControllerLeadershipManager getControllerLeadershipManager() {
+ return _controllerLeadershipManager;
+ }
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
index b92acaf..5a849e1 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
@@ -80,9 +80,11 @@ public class PinotRealtimeSegmentManager implements HelixPropertyListener, IZkCh
private final PinotHelixResourceManager _pinotHelixResourceManager;
private ZkClient _zkClient;
private ControllerMetrics _controllerMetrics;
+ private final ControllerLeadershipManager _controllerLeadershipManager;
- public PinotRealtimeSegmentManager(PinotHelixResourceManager pinotManager) {
+ public PinotRealtimeSegmentManager(PinotHelixResourceManager pinotManager, ControllerLeadershipManager controllerLeadershipManager) {
_pinotHelixResourceManager = pinotManager;
+ _controllerLeadershipManager = controllerLeadershipManager;
String clusterName = _pinotHelixResourceManager.getHelixClusterName();
_propertyStorePath = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName);
_tableConfigPath = _propertyStorePath + TABLE_CONFIG;
@@ -102,7 +104,7 @@ public class PinotRealtimeSegmentManager implements HelixPropertyListener, IZkCh
_zkClient.subscribeDataChanges(_tableConfigPath, this);
// Subscribe to leadership changes
- ControllerLeadershipManager.getInstance().subscribe(PinotLLCRealtimeSegmentManager.class.getName(), this);
+ _controllerLeadershipManager.subscribe(PinotLLCRealtimeSegmentManager.class.getName(), this);
// Setup change listeners for already existing tables, if any.
processPropertyStoreChange(_tableConfigPath);
@@ -271,7 +273,7 @@ public class PinotRealtimeSegmentManager implements HelixPropertyListener, IZkCh
}
private boolean isLeader() {
- return ControllerLeadershipManager.getInstance().isLeader();
+ return _controllerLeadershipManager.isLeader();
}
@Override
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index 7591898..618c4a6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -73,6 +73,7 @@ public class SegmentCompletionManager {
private final Map<String, Long> _commitTimeMap = new ConcurrentHashMap<>();
private final PinotLLCRealtimeSegmentManager _segmentManager;
private final ControllerMetrics _controllerMetrics;
+ private final ControllerLeadershipManager _controllerLeadershipManager;
// Half hour max commit time for all segments
private static final int MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS = 1800;
@@ -84,10 +85,11 @@ public class SegmentCompletionManager {
// TODO keep some history of past committed segments so that we can avoid looking up PROPERTYSTORE if some server comes in late.
protected SegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager,
- ControllerMetrics controllerMetrics) {
+ ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
_helixManager = helixManager;
_segmentManager = segmentManager;
_controllerMetrics = controllerMetrics;
+ _controllerLeadershipManager = controllerLeadershipManager;
}
public boolean isSplitCommitEnabled() {
@@ -100,11 +102,11 @@ public class SegmentCompletionManager {
public static SegmentCompletionManager create(HelixManager helixManager,
PinotLLCRealtimeSegmentManager segmentManager, ControllerConf controllerConf,
- ControllerMetrics controllerMetrics) {
+ ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
if (_instance != null) {
throw new RuntimeException("Cannot create multiple instances");
}
- _instance = new SegmentCompletionManager(helixManager, segmentManager, controllerMetrics);
+ _instance = new SegmentCompletionManager(helixManager, segmentManager, controllerMetrics, controllerLeadershipManager);
SegmentCompletionProtocol.setMaxSegmentCommitTimeMs(
TimeUnit.MILLISECONDS.convert(controllerConf.getSegmentCommitTimeoutSeconds(), TimeUnit.SECONDS));
return _instance;
@@ -1122,6 +1124,6 @@ public class SegmentCompletionManager {
@VisibleForTesting
protected boolean isLeader() {
- return ControllerLeadershipManager.getInstance().isLeader();
+ return _controllerLeadershipManager.isLeader();
}
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
index 64c78ae..156ad9f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
@@ -48,13 +48,16 @@ public class StorageQuotaChecker {
private final TableConfig _tableConfig;
private final ControllerMetrics _controllerMetrics;
private final PinotHelixResourceManager _pinotHelixResourceManager;
+ private final ControllerLeadershipManager _controllerLeadershipManager;
public StorageQuotaChecker(TableConfig tableConfig, TableSizeReader tableSizeReader,
- ControllerMetrics controllerMetrics, PinotHelixResourceManager pinotHelixResourceManager) {
+ ControllerMetrics controllerMetrics, PinotHelixResourceManager pinotHelixResourceManager,
+ ControllerLeadershipManager controllerLeadershipManager) {
_tableConfig = tableConfig;
_tableSizeReader = tableSizeReader;
_controllerMetrics = controllerMetrics;
_pinotHelixResourceManager = pinotHelixResourceManager;
+ _controllerLeadershipManager = controllerLeadershipManager;
}
public static class QuotaCheckerResponse {
@@ -213,6 +216,6 @@ public class StorageQuotaChecker {
}
protected boolean isLeader() {
- return ControllerLeadershipManager.getInstance().isLeader();
+ return _controllerLeadershipManager.isLeader();
}
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 8153559..4f881ff 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -56,6 +56,7 @@ import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.ControllerLeadershipManager;
import org.apache.pinot.controller.api.resources.LLCSegmentCompletionHandlers;
import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
@@ -557,8 +558,9 @@ public class PinotLLCRealtimeSegmentManagerTest {
System.currentTimeMillis());
CommittingSegmentDescriptor committingSegmentDescriptor =
new CommittingSegmentDescriptor(latestSegment.getSegmentName(), latestMetadata.getStartOffset() + 100, 0);
- segmentManager.updateOldSegmentMetadataZNRecord(tableName, latestSegment,
- latestMetadata.getStartOffset() + 100, committingSegmentDescriptor);
+ segmentManager
+ .updateOldSegmentMetadataZNRecord(tableName, latestSegment, latestMetadata.getStartOffset() + 100,
+ committingSegmentDescriptor);
segmentManager.createNewSegmentMetadataZNRecord(tableConfig, latestSegment, newLlcSegmentName,
expectedPartitionAssignment, committingSegmentDescriptor, false);
@@ -633,7 +635,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
segmentManager.getRealtimeSegmentZKMetadata(tableName, latestSegment.getSegmentName(), null);
segmentManager
.updateOldSegmentMetadataZNRecord(tableName, latestSegment, latestMetadata.getStartOffset() + 100,
- new CommittingSegmentDescriptor(latestSegment.getSegmentName(), latestMetadata.getStartOffset() + 100, 0));
+ new CommittingSegmentDescriptor(latestSegment.getSegmentName(),
+ latestMetadata.getStartOffset() + 100, 0));
idealState = idealStateBuilder.setSegmentState(latestSegment.getSegmentName(), "ONLINE").build();
// get old state
@@ -668,7 +671,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
segmentManager.getRealtimeSegmentZKMetadata(tableName, latestSegment.getSegmentName(), null);
segmentManager
.updateOldSegmentMetadataZNRecord(tableName, latestSegment, latestMetadata.getStartOffset() + 100,
- new CommittingSegmentDescriptor(latestSegment.getSegmentName(), latestMetadata.getStartOffset() + 100, 0));
+ new CommittingSegmentDescriptor(latestSegment.getSegmentName(),
+ latestMetadata.getStartOffset() + 100, 0));
// get old state
nPartitions = expectedPartitionAssignment.getNumPartitions();
@@ -895,9 +899,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
segmentManager._records.clear();
segmentManager.IS_CONNECTED = false;
reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
- CommittingSegmentDescriptor committingSegmentDescriptor =
- CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams,
- segmentManager.newMockSegmentMetadata());
+ CommittingSegmentDescriptor committingSegmentDescriptor = CommittingSegmentDescriptor
+ .fromSegmentCompletionReqParamsAndMetadata(reqParams, segmentManager.newMockSegmentMetadata());
boolean status = segmentManager.commitSegmentMetadata(rawTableName, committingSegmentDescriptor);
Assert.assertFalse(status);
Assert.assertEquals(segmentManager._nCallsToUpdateHelix, 0); // Idealstate not updated
@@ -952,8 +955,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
segmentManager._records.clear();
Set<String> prevInstances = idealState.getInstanceSet(committingSegmentMetadata.getSegmentName());
reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
- CommittingSegmentDescriptor committingSegmentDescriptor =
- CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams, segmentManager.newMockSegmentMetadata());
+ CommittingSegmentDescriptor committingSegmentDescriptor = CommittingSegmentDescriptor
+ .fromSegmentCompletionReqParamsAndMetadata(reqParams, segmentManager.newMockSegmentMetadata());
boolean status = segmentManager.commitSegmentMetadata(rawTableName, committingSegmentDescriptor);
segmentManager.verifyMetadataInteractions();
Assert.assertTrue(status);
@@ -976,8 +979,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
segmentManager._records.clear();
prevInstances = idealState.getInstanceSet(committingSegmentMetadata.getSegmentName());
reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
- committingSegmentDescriptor = CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams,
- segmentManager.newMockSegmentMetadata());
+ committingSegmentDescriptor = CommittingSegmentDescriptor
+ .fromSegmentCompletionReqParamsAndMetadata(reqParams, segmentManager.newMockSegmentMetadata());
status = segmentManager.commitSegmentMetadata(rawTableName, committingSegmentDescriptor);
segmentManager.verifyMetadataInteractions();
Assert.assertTrue(status);
@@ -996,8 +999,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
segmentManager._records.clear();
reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
// We do not expect the segment metadata to be used. Thus reuse the current metadata.
- committingSegmentDescriptor = CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams,
- segmentManager.getMockSegmentMetadata());
+ committingSegmentDescriptor = CommittingSegmentDescriptor
+ .fromSegmentCompletionReqParamsAndMetadata(reqParams, segmentManager.getMockSegmentMetadata());
status = segmentManager.commitSegmentMetadata(rawTableName, committingSegmentDescriptor);
segmentManager.verifyMetadataInteractions();
Assert.assertFalse(status);
@@ -1013,8 +1016,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
segmentManager._records.clear();
reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
// We do not expect the segment metadata to be used. Thus reuse the current metadata.
- committingSegmentDescriptor = CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams,
- segmentManager.getMockSegmentMetadata());
+ committingSegmentDescriptor = CommittingSegmentDescriptor
+ .fromSegmentCompletionReqParamsAndMetadata(reqParams, segmentManager.getMockSegmentMetadata());
status = segmentManager.commitSegmentMetadata(rawTableName, committingSegmentDescriptor);
segmentManager.verifyMetadataInteractions();
Assert.assertFalse(status);
@@ -1029,8 +1032,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
committingSegmentMetadata = new LLCRealtimeSegmentZKMetadata(newZnRec);
prevInstances = idealState.getInstanceSet(committingSegmentMetadata.getSegmentName());
reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
- committingSegmentDescriptor = CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams,
- segmentManager.newMockSegmentMetadata());
+ committingSegmentDescriptor = CommittingSegmentDescriptor
+ .fromSegmentCompletionReqParamsAndMetadata(reqParams, segmentManager.newMockSegmentMetadata());
status = segmentManager.commitSegmentMetadata(rawTableName, committingSegmentDescriptor);
segmentManager.verifyMetadataInteractions();
Assert.assertTrue(status);
@@ -1099,9 +1102,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
new LLCRealtimeSegmentZKMetadata(segmentManager2._records.get(committingPartition));
reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
- CommittingSegmentDescriptor committingSegmentDescriptor =
- CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams,
- segmentManager1.newMockSegmentMetadata());
+ CommittingSegmentDescriptor committingSegmentDescriptor = CommittingSegmentDescriptor
+ .fromSegmentCompletionReqParamsAndMetadata(reqParams, segmentManager1.newMockSegmentMetadata());
boolean status = segmentManager1.commitSegmentMetadata(rawTableName, committingSegmentDescriptor);
Assert.assertTrue(status); // Committing segment metadata succeeded.
@@ -1119,7 +1121,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
* @throws InvalidConfigException
*/
@Test
- public void testIdealStateAlreadyUpdated() throws InvalidConfigException {
+ public void testIdealStateAlreadyUpdated()
+ throws InvalidConfigException {
FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager(null);
String tableNameWithType = "tableName_REALTIME";
String rawTableName = "tableName";
@@ -1130,9 +1133,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
IdealState idealState = segmentManager._tableIdealState;
TableConfig tableConfig = segmentManager._tableConfigStore.getTableConfig(tableNameWithType);
- PartitionAssignment partitionAssignment =
- segmentManager._partitionAssignmentGenerator.getStreamPartitionAssignmentFromIdealState(tableConfig,
- idealState);
+ PartitionAssignment partitionAssignment = segmentManager._partitionAssignmentGenerator
+ .getStreamPartitionAssignmentFromIdealState(tableConfig, idealState);
int partitionId = 0;
int seq = 0;
IdealStateBuilderUtil idealStateBuilderUtil = new IdealStateBuilderUtil(idealState, tableNameWithType);
@@ -1165,7 +1167,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
Assert.assertEquals(idealState, idealStateCopy);
}
- private void setupSegmentManager(FakePinotLLCRealtimeSegmentManager segmentManager, String rtTableName, int nPartitions, int nReplicas, int nInstances)
+ private void setupSegmentManager(FakePinotLLCRealtimeSegmentManager segmentManager, String rtTableName,
+ int nPartitions, int nReplicas, int nInstances)
throws InvalidConfigException {
List<String> instances = getInstanceList(nInstances);
@@ -1338,9 +1341,9 @@ public class PinotLLCRealtimeSegmentManagerTest {
private TableConfigStore _tableConfigStore;
- protected FakePinotLLCRealtimeSegmentManager(List<String> existingLLCSegments) {
-
- super(null, clusterName, null, null, null, CONTROLLER_CONF, new ControllerMetrics(new MetricsRegistry()));
+ protected FakePinotLLCRealtimeSegmentManager(List<String> existingLLCSegments, HelixManager helixManager) {
+ super(null, clusterName, helixManager, null, null, CONTROLLER_CONF, new ControllerMetrics(new MetricsRegistry()),
+ new ControllerLeadershipManager(helixManager));
try {
TableConfigCache mockCache = mock(TableConfigCache.class);
TableConfig mockTableConfig = mock(TableConfig.class);
@@ -1375,6 +1378,10 @@ public class PinotLLCRealtimeSegmentManagerTest {
_tableConfigStore = new TableConfigStore();
}
+ protected FakePinotLLCRealtimeSegmentManager(List<String> existingLLCSegments) {
+ this(existingLLCSegments, mock(HelixManager.class));
+ }
+
private SegmentMetadataImpl newMockSegmentMetadata() {
segmentMetadata = mock(SegmentMetadataImpl.class);
when(segmentMetadata.getCrc()).thenReturn(FakePinotLLCRealtimeSegmentManager.CRC);
@@ -1480,7 +1487,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
partitionAssignment, committingSegmentDescriptor, isNewTableSetup);
}
-
@Override
public LLCRealtimeSegmentZKMetadata getRealtimeSegmentZKMetadata(String realtimeTableName, String segmentName,
Stat stat) {
@@ -1500,7 +1506,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
return metadata;
}
-
public void verifyMetadataInteractions() {
verify(segmentMetadata, times(1)).getCrc();
verify(segmentMetadata, times(2)).getTimeInterval();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index 3b8503e..0dfe6b4 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -30,6 +30,7 @@ import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.ControllerLeadershipManager;
import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.apache.zookeeper.data.Stat;
import org.testng.Assert;
@@ -64,7 +65,8 @@ public class SegmentCompletionTest {
public void testCaseSetup(boolean isLeader, boolean isConnected)
throws Exception {
- segmentManager = new MockPinotLLCRealtimeSegmentManager();
+ segmentManager = new MockPinotLLCRealtimeSegmentManager(isLeader, isConnected);
+ ControllerLeadershipManager controllerLeadershipManager = segmentManager.getControllerLeadershipManager();
final int partitionId = 23;
final int seqId = 12;
final long now = System.currentTimeMillis();
@@ -76,7 +78,8 @@ public class SegmentCompletionTest {
metadata.setNumReplicas(3);
segmentManager._segmentMetadata = metadata;
- segmentCompletionMgr = new MockSegmentCompletionManager(segmentManager, isLeader, isConnected);
+ segmentCompletionMgr =
+ new MockSegmentCompletionManager(segmentManager, isLeader, isConnected);
segmentManager._segmentCompletionMgr = segmentCompletionMgr;
Field fsmMapField = SegmentCompletionManager.class.getDeclaredField("_fsmMap");
@@ -151,8 +154,8 @@ public class SegmentCompletionTest {
segmentCompletionMgr._secconds += 5;
params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
- response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
- CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+ response = segmentCompletionMgr
+ .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
// Now the FSM should have disappeared from the map
@@ -221,8 +224,8 @@ public class SegmentCompletionTest {
segmentCompletionMgr._secconds += 5;
params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
- response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
- CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+ response = segmentCompletionMgr
+ .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
// Now the FSM should have disappeared from the map
@@ -332,8 +335,8 @@ public class SegmentCompletionTest {
segmentCompletionMgr._secconds += 5;
params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
.withSegmentLocation("doNotCommitMe");
- response = segmentCompletionMgr.segmentCommitEnd(params, true, true,
- CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+ response = segmentCompletionMgr
+ .segmentCommitEnd(params, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.FAILED);
// Now the FSM should have aborted
@@ -366,8 +369,8 @@ public class SegmentCompletionTest {
segmentCompletionMgr._secconds += 5;
params = new Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr)
.withSegmentLocation("location");
- response = segmentCompletionMgr.segmentCommitEnd(params, true, true,
- CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+ response = segmentCompletionMgr
+ .segmentCommitEnd(params, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(), ControllerResponseStatus.COMMIT_SUCCESS);
// And the FSM should be removed.
Assert.assertFalse(fsmMap.containsKey(segmentNameStr));
@@ -419,8 +422,8 @@ public class SegmentCompletionTest {
segmentCompletionMgr._secconds += 5;
params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
.withSegmentLocation("location");
- response = segmentCompletionMgr.segmentCommitEnd(params, true, true,
- CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+ response = segmentCompletionMgr
+ .segmentCommitEnd(params, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
// Now the FSM should have disappeared from the map
@@ -469,8 +472,8 @@ public class SegmentCompletionTest {
segmentCompletionMgr._secconds += 5;
params = new Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr)
.withSegmentLocation("location");
- response = segmentCompletionMgr.segmentCommitEnd(params, true, true,
- CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+ response = segmentCompletionMgr
+ .segmentCommitEnd(params, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.FAILED);
// Now the FSM should have disappeared from the map
@@ -529,8 +532,8 @@ public class SegmentCompletionTest {
segmentCompletionMgr._secconds += 5;
params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
- response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
- CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+ response = segmentCompletionMgr
+ .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
// Now the FSM should have disappeared from the map
@@ -602,8 +605,8 @@ public class SegmentCompletionTest {
segmentCompletionMgr._secconds += 5;
params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
- response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
- CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+ response = segmentCompletionMgr
+ .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
// We ask S2 to keep the segment
params = new Request.Params().withInstanceId(s2).withOffset(s1Offset).withSegmentName(segmentNameStr)
@@ -670,7 +673,7 @@ public class SegmentCompletionTest {
params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
.withSegmentLocation("location");
response = segmentCompletionMgr.segmentCommitEnd(params, true, isSplitCommit,
- CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+ CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
// Now the FSM should have disappeared from the map
Assert.assertFalse(fsmMap.containsKey(segmentNameStr));
@@ -864,8 +867,8 @@ public class SegmentCompletionTest {
long commitTimeMs = (segmentCompletionMgr._secconds - startTime) * 1000;
Assert.assertEquals(commitTimeMap.get(tableName).longValue(), commitTimeMs);
segmentCompletionMgr._secconds += 55;
- response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
- CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+ response = segmentCompletionMgr
+ .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
// now FSM should be out of the map.
Assert.assertFalse((fsmMap.containsKey(segmentNameStr)));
@@ -1137,9 +1140,11 @@ public class SegmentCompletionTest {
private static final ControllerConf CONTROLLER_CONF = new ControllerConf();
public LLCSegmentName _stoppedSegmentName;
public String _stoppedInstance;
+ public HelixManager _helixManager = mock(HelixManager.class);
- protected MockPinotLLCRealtimeSegmentManager() {
- super(null, clusterName, null, null, null, CONTROLLER_CONF, new ControllerMetrics(new MetricsRegistry()));
+ protected MockPinotLLCRealtimeSegmentManager(boolean isLeader, boolean isConnected) {
+ super(null, clusterName, null, null, null, CONTROLLER_CONF, new ControllerMetrics(new MetricsRegistry()),
+ new ControllerLeadershipManager(createMockHelixManager(isLeader, isConnected)));
}
@Override
@@ -1186,8 +1191,18 @@ public class SegmentCompletionTest {
protected MockSegmentCompletionManager(PinotLLCRealtimeSegmentManager segmentManager, boolean isLeader,
boolean isConnected) {
- super(createMockHelixManager(isLeader, isConnected), segmentManager,
- new ControllerMetrics(new MetricsRegistry()));
+ this(createMockHelixManager(isLeader, isConnected), segmentManager, isLeader, isConnected);
+ }
+
+ protected MockSegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager, boolean isLeader,
+ boolean isConnected) {
+ this(helixManager, segmentManager, isLeader, isConnected, new ControllerLeadershipManager(helixManager));
+ }
+
+ protected MockSegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager,
+ boolean isLeader, boolean isConnected, ControllerLeadershipManager controllerLeadershipManager) {
+ super(helixManager, segmentManager, new ControllerMetrics(new MetricsRegistry()),
+ controllerLeadershipManager);
_isLeader = isLeader;
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
index f69a42c..ed38a50 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
@@ -30,6 +30,7 @@ import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerLeadershipManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.util.TableSizeReader;
import org.mockito.invocation.InvocationOnMock;
@@ -48,6 +49,7 @@ public class StorageQuotaCheckerTest {
private TableConfig _tableConfig;
private ControllerMetrics _controllerMetrics;
private PinotHelixResourceManager _pinotHelixResourceManager;
+ private ControllerLeadershipManager _controllerLeadershipManager;
private QuotaConfig _quotaConfig;
private SegmentsValidationAndRetentionConfig _validationConfig;
private static final File TEST_DIR = new File(StorageQuotaCheckerTest.class.getName());
@@ -60,6 +62,7 @@ public class StorageQuotaCheckerTest {
_controllerMetrics = new ControllerMetrics(new MetricsRegistry());
_validationConfig = mock(SegmentsValidationAndRetentionConfig.class);
_pinotHelixResourceManager = mock(PinotHelixResourceManager.class);
+ _controllerLeadershipManager = mock(ControllerLeadershipManager.class);
when(_tableConfig.getValidationConfig()).thenReturn(_validationConfig);
when(_validationConfig.getReplicationNumber()).thenReturn(2);
TEST_DIR.mkdirs();
@@ -74,7 +77,8 @@ public class StorageQuotaCheckerTest {
public void testNoQuota()
throws InvalidConfigException {
StorageQuotaChecker checker =
- new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
+ new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager,
+ _controllerLeadershipManager);
when(_tableConfig.getQuotaConfig()).thenReturn(null);
StorageQuotaChecker.QuotaCheckerResponse res =
checker.isSegmentStorageWithinQuota(TEST_DIR, "myTable", "segment", 1000);
@@ -85,7 +89,8 @@ public class StorageQuotaCheckerTest {
public void testNoStorageQuotaConfig()
throws InvalidConfigException {
StorageQuotaChecker checker =
- new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
+ new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager,
+ _controllerLeadershipManager);
when(_tableConfig.getQuotaConfig()).thenReturn(_quotaConfig);
when(_quotaConfig.storageSizeBytes()).thenReturn(-1L);
StorageQuotaChecker.QuotaCheckerResponse res =
@@ -128,7 +133,8 @@ public class StorageQuotaCheckerTest {
when(_quotaConfig.storageSizeBytes()).thenReturn(3000L);
when(_quotaConfig.getStorage()).thenReturn("3K");
StorageQuotaChecker checker =
- new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
+ new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager,
+ _controllerLeadershipManager);
StorageQuotaChecker.QuotaCheckerResponse response =
checker.isSegmentStorageWithinQuota(TEST_DIR, tableName, "segment1", 1000);
Assert.assertTrue(response.isSegmentWithinQuota);
@@ -177,8 +183,9 @@ public class StorageQuotaCheckerTest {
private class MockStorageQuotaChecker extends StorageQuotaChecker {
public MockStorageQuotaChecker(TableConfig tableConfig, TableSizeReader tableSizeReader,
- ControllerMetrics controllerMetrics, PinotHelixResourceManager pinotHelixResourceManager) {
- super(tableConfig, tableSizeReader, controllerMetrics, pinotHelixResourceManager);
+ ControllerMetrics controllerMetrics, PinotHelixResourceManager pinotHelixResourceManager,
+ ControllerLeadershipManager controllerLeadershipManager) {
+ super(tableConfig, tableSizeReader, controllerMetrics, pinotHelixResourceManager, controllerLeadershipManager);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org