You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/04/10 20:29:18 UTC

[incubator-pinot] branch master updated: Refactor ControllerLeaderManager by using dependency injection instead of singleton (#4054)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e6f0341  Refactor ControllerLeaderManager by using dependency injection instead of singleton (#4054)
e6f0341 is described below

commit e6f03416d1c7ae3f630399081fec5c60a3da4aac
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Wed Apr 10 13:29:12 2019 -0700

    Refactor ControllerLeaderManager by using dependency injection instead of singleton (#4054)
---
 .../controller/ControllerLeadershipManager.java    | 24 +-------
 .../apache/pinot/controller/ControllerStarter.java | 14 +++--
 .../PinotSegmentUploadRestletResource.java         |  6 +-
 .../controller/api/upload/SegmentValidator.java    |  9 ++-
 .../ControllerPeriodicTaskScheduler.java           |  5 +-
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 20 ++++---
 .../core/realtime/PinotRealtimeSegmentManager.java |  8 ++-
 .../core/realtime/SegmentCompletionManager.java    | 10 ++--
 .../controller/validation/StorageQuotaChecker.java |  7 ++-
 .../PinotLLCRealtimeSegmentManagerTest.java        | 65 ++++++++++++----------
 .../helix/core/realtime/SegmentCompletionTest.java | 65 +++++++++++++---------
 .../validation/StorageQuotaCheckerTest.java        | 17 ++++--
 12 files changed, 140 insertions(+), 110 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerLeadershipManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerLeadershipManager.java
index 6ffa655..46160c6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerLeadershipManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerLeadershipManager.java
@@ -42,34 +42,12 @@ public class ControllerLeadershipManager {
 
   private Map<String, LeadershipChangeSubscriber> _subscribers = new ConcurrentHashMap<>();
 
-  private ControllerLeadershipManager(HelixManager helixManager) {
+  public ControllerLeadershipManager(HelixManager helixManager) {
     _helixManager = helixManager;
     _helixManager.addControllerListener((ControllerChangeListener) notificationContext -> onControllerChange());
   }
 
   /**
-   * Create an instance of ControllerLeadershipManager
-   * @param helixManager
-   */
-  public static synchronized void init(HelixManager helixManager) {
-    if (INSTANCE != null) {
-      throw new RuntimeException("Instance of ControllerLeadershipManager already created");
-    }
-    INSTANCE = new ControllerLeadershipManager(helixManager);
-  }
-
-  /**
-   * Get the instance of ControllerLeadershipManager
-   * @return
-   */
-  public static synchronized ControllerLeadershipManager getInstance() {
-    if (INSTANCE == null) {
-      throw new RuntimeException("Instance of ControllerLeadershipManager not yet created");
-    }
-    return INSTANCE;
-  }
-
-  /**
    * When stopping this service, if the controller is leader, invoke {@link ControllerLeadershipManager#onBecomingNonLeader()}
    */
   public void stop() {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index 4051990..10c1c00 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -107,6 +107,7 @@ public class ControllerStarter {
   private ControllerPeriodicTaskScheduler _controllerPeriodicTaskScheduler;
   private PinotHelixTaskResourceManager _helixTaskResourceManager;
   private PinotRealtimeSegmentManager _realtimeSegmentsManager;
+  private ControllerLeadershipManager _controllerLeadershipManager;
   private List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbackList;
 
   public ControllerStarter(ControllerConf conf) {
@@ -230,9 +231,9 @@ public class ControllerStarter {
     // Note: Currently leadership depends on helix controller, thus assign helixControllerManager to ControllerLeadershipManager.
     // TODO: In the future when Helix separation is completed, leadership only depends on the master in leadControllerResource, and ControllerLeadershipManager will be removed.
     if (_helixControllerManager != null) {
-      ControllerLeadershipManager.init(_helixControllerManager);
+      _controllerLeadershipManager = new ControllerLeadershipManager(_helixControllerManager);
     } else {
-      ControllerLeadershipManager.init(helixParticipantManager);
+      _controllerLeadershipManager = new ControllerLeadershipManager(helixParticipantManager);
     }
 
     LOGGER.info("Starting task resource manager");
@@ -240,15 +241,15 @@ public class ControllerStarter {
 
     // Helix resource manager must be started in order to create PinotLLCRealtimeSegmentManager
     LOGGER.info("Starting realtime segment manager");
-    PinotLLCRealtimeSegmentManager.create(_helixResourceManager, _config, _controllerMetrics);
-    _realtimeSegmentsManager = new PinotRealtimeSegmentManager(_helixResourceManager);
+    PinotLLCRealtimeSegmentManager.create(_helixResourceManager, _config, _controllerMetrics, _controllerLeadershipManager);
+    _realtimeSegmentsManager = new PinotRealtimeSegmentManager(_helixResourceManager, _controllerLeadershipManager);
     _realtimeSegmentsManager.start(_controllerMetrics);
 
     // Setting up periodic tasks
     List<PeriodicTask> controllerPeriodicTasks = setupControllerPeriodicTasks();
     LOGGER.info("Init controller periodic tasks scheduler");
     _controllerPeriodicTaskScheduler = new ControllerPeriodicTaskScheduler();
-    _controllerPeriodicTaskScheduler.init(controllerPeriodicTasks);
+    _controllerPeriodicTaskScheduler.init(controllerPeriodicTasks, _controllerLeadershipManager);
 
     LOGGER.info("Creating rebalance segments factory");
     RebalanceSegmentStrategyFactory.createInstance(helixParticipantManager);
@@ -284,6 +285,7 @@ public class ControllerStarter {
         bind(_controllerMetrics).to(ControllerMetrics.class);
         bind(accessControlFactory).to(AccessControlFactory.class);
         bind(metadataEventNotifierFactory).to(MetadataEventNotifierFactory.class);
+        bind(_controllerLeadershipManager).to(ControllerLeadershipManager.class);
       }
     });
 
@@ -435,7 +437,7 @@ public class ControllerStarter {
     try {
       // Stopping ControllerLeadershipManager has to be done before stopping HelixResourceManager.
       LOGGER.info("Stopping controller leadership manager");
-      ControllerLeadershipManager.getInstance().stop();
+      _controllerLeadershipManager.stop();
 
       // Stop PinotLLCSegmentManager before stopping Jersey API. It is possible that stopping Jersey API
       // may interrupt the handlers waiting on an I/O.
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
index 0bbb4b0..4581d9d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
@@ -70,6 +70,7 @@ import org.apache.pinot.common.utils.JsonUtils;
 import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.ControllerLeadershipManager;
 import org.apache.pinot.controller.api.access.AccessControl;
 import org.apache.pinot.controller.api.access.AccessControlFactory;
 import org.apache.pinot.controller.api.upload.SegmentValidator;
@@ -115,6 +116,9 @@ public class PinotSegmentUploadRestletResource {
   @Inject
   AccessControlFactory _accessControlFactory;
 
+  @Inject
+  ControllerLeadershipManager _controllerLeadershipManager;
+
   @GET
   @Produces(MediaType.APPLICATION_JSON)
   @Path("/segments")
@@ -319,7 +323,7 @@ public class PinotSegmentUploadRestletResource {
       // Validate segment
       SegmentValidatorResponse segmentValidatorResponse =
           new SegmentValidator(_pinotHelixResourceManager, _controllerConf, _executor, _connectionManager,
-              _controllerMetrics).validateSegment(segmentMetadata, tempSegmentDir);
+              _controllerMetrics, _controllerLeadershipManager).validateSegment(segmentMetadata, tempSegmentDir);
 
       // Zk operations
       completeZkOperations(enableParallelPushProtection, headers, tempEncryptedFile, provider, segmentMetadata,
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
index a7dc32e..740cd22 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
@@ -34,6 +34,7 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.segment.SegmentMetadata;
 import org.apache.pinot.common.utils.time.TimeUtils;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.ControllerLeadershipManager;
 import org.apache.pinot.controller.api.resources.ControllerApplicationException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.util.TableSizeReader;
@@ -54,14 +55,17 @@ public class SegmentValidator {
   private final Executor _executor;
   private final HttpConnectionManager _connectionManager;
   private final ControllerMetrics _controllerMetrics;
+  private final ControllerLeadershipManager _controllerLeadershipManager;
 
   public SegmentValidator(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf controllerConf,
-      Executor executor, HttpConnectionManager connectionManager, ControllerMetrics controllerMetrics) {
+      Executor executor, HttpConnectionManager connectionManager, ControllerMetrics controllerMetrics,
+      ControllerLeadershipManager controllerLeadershipManager) {
     _pinotHelixResourceManager = pinotHelixResourceManager;
     _controllerConf = controllerConf;
     _executor = executor;
     _connectionManager = connectionManager;
     _controllerMetrics = controllerMetrics;
+    _controllerLeadershipManager = controllerLeadershipManager;
   }
 
   public SegmentValidatorResponse validateSegment(SegmentMetadata segmentMetadata, File tempSegmentDir) {
@@ -130,7 +134,8 @@ public class SegmentValidator {
     TableSizeReader tableSizeReader =
         new TableSizeReader(_executor, _connectionManager, _controllerMetrics, _pinotHelixResourceManager);
     StorageQuotaChecker quotaChecker =
-        new StorageQuotaChecker(offlineTableConfig, tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
+        new StorageQuotaChecker(offlineTableConfig, tableSizeReader, _controllerMetrics, _pinotHelixResourceManager,
+            _controllerLeadershipManager);
     String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(metadata.getTableName());
     return quotaChecker.isSegmentStorageWithinQuota(segmentFile, offlineTableName, metadata.getName(),
         _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java
index c5ce341..8a2b63c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java
@@ -39,10 +39,11 @@ public class ControllerPeriodicTaskScheduler extends PeriodicTaskScheduler imple
    * Initialize the {@link ControllerPeriodicTaskScheduler} with the list of {@link ControllerPeriodicTask} created at startup
    * This is called only once during controller startup
    * @param controllerPeriodicTasks
+   * @param controllerLeadershipManager
    */
-  public void init(List<PeriodicTask> controllerPeriodicTasks) {
+  public void init(List<PeriodicTask> controllerPeriodicTasks, ControllerLeadershipManager controllerLeadershipManager) {
     super.init(controllerPeriodicTasks);
-    ControllerLeadershipManager.getInstance().subscribe(ControllerPeriodicTaskScheduler.class.getName(), this);
+    controllerLeadershipManager.subscribe(ControllerPeriodicTaskScheduler.class.getName(), this);
   }
 
   @Override
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index c26780b..324b9ae 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -126,6 +126,7 @@ public class PinotLLCRealtimeSegmentManager {
   private final TableConfigCache _tableConfigCache;
   private final StreamPartitionAssignmentGenerator _streamPartitionAssignmentGenerator;
   private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
+  private final ControllerLeadershipManager _controllerLeadershipManager;
 
   private volatile boolean _isStopping = false;
   private AtomicInteger _numCompletingSegments = new AtomicInteger(0);
@@ -139,22 +140,22 @@ public class PinotLLCRealtimeSegmentManager {
   }
 
   public static synchronized void create(PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
-      ControllerMetrics controllerMetrics) {
+      ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
     create(helixResourceManager.getHelixAdmin(), helixResourceManager.getHelixClusterName(),
         helixResourceManager.getHelixZkManager(), helixResourceManager.getPropertyStore(), helixResourceManager,
-        controllerConf, controllerMetrics);
+        controllerConf, controllerMetrics, controllerLeadershipManager);
   }
 
   private static synchronized void create(HelixAdmin helixAdmin, String clusterName, HelixManager helixManager,
       ZkHelixPropertyStore propertyStore, PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
-      ControllerMetrics controllerMetrics) {
+      ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
     if (INSTANCE != null) {
       throw new RuntimeException("Instance already created");
     }
     INSTANCE =
         new PinotLLCRealtimeSegmentManager(helixAdmin, clusterName, helixManager, propertyStore, helixResourceManager,
-            controllerConf, controllerMetrics);
-    SegmentCompletionManager.create(helixManager, INSTANCE, controllerConf, controllerMetrics);
+            controllerConf, controllerMetrics, controllerLeadershipManager);
+    SegmentCompletionManager.create(helixManager, INSTANCE, controllerConf, controllerMetrics, controllerLeadershipManager);
   }
 
   public void stop() {
@@ -186,7 +187,7 @@ public class PinotLLCRealtimeSegmentManager {
 
   protected PinotLLCRealtimeSegmentManager(HelixAdmin helixAdmin, String clusterName, HelixManager helixManager,
       ZkHelixPropertyStore propertyStore, PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
-      ControllerMetrics controllerMetrics) {
+      ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
     _helixAdmin = helixAdmin;
     _helixManager = helixManager;
     _propertyStore = propertyStore;
@@ -202,6 +203,7 @@ public class PinotLLCRealtimeSegmentManager {
     _tableConfigCache = new TableConfigCache(_propertyStore);
     _streamPartitionAssignmentGenerator = new StreamPartitionAssignmentGenerator(_helixManager);
     _flushThresholdUpdateManager = new FlushThresholdUpdateManager();
+    _controllerLeadershipManager = controllerLeadershipManager;
   }
 
   public static PinotLLCRealtimeSegmentManager getInstance() {
@@ -212,7 +214,7 @@ public class PinotLLCRealtimeSegmentManager {
   }
 
   protected boolean isLeader() {
-    return ControllerLeadershipManager.getInstance().isLeader();
+    return _controllerLeadershipManager.isLeader();
   }
 
   protected boolean isConnected() {
@@ -1376,4 +1378,8 @@ public class PinotLLCRealtimeSegmentManager {
 
     return idealState;
   }
+
+  public ControllerLeadershipManager getControllerLeadershipManager() {
+    return _controllerLeadershipManager;
+  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
index b92acaf..5a849e1 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
@@ -80,9 +80,11 @@ public class PinotRealtimeSegmentManager implements HelixPropertyListener, IZkCh
   private final PinotHelixResourceManager _pinotHelixResourceManager;
   private ZkClient _zkClient;
   private ControllerMetrics _controllerMetrics;
+  private final ControllerLeadershipManager _controllerLeadershipManager;
 
-  public PinotRealtimeSegmentManager(PinotHelixResourceManager pinotManager) {
+  public PinotRealtimeSegmentManager(PinotHelixResourceManager pinotManager, ControllerLeadershipManager controllerLeadershipManager) {
     _pinotHelixResourceManager = pinotManager;
+    _controllerLeadershipManager = controllerLeadershipManager;
     String clusterName = _pinotHelixResourceManager.getHelixClusterName();
     _propertyStorePath = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, clusterName);
     _tableConfigPath = _propertyStorePath + TABLE_CONFIG;
@@ -102,7 +104,7 @@ public class PinotRealtimeSegmentManager implements HelixPropertyListener, IZkCh
     _zkClient.subscribeDataChanges(_tableConfigPath, this);
 
     // Subscribe to leadership changes
-    ControllerLeadershipManager.getInstance().subscribe(PinotLLCRealtimeSegmentManager.class.getName(), this);
+    _controllerLeadershipManager.subscribe(PinotLLCRealtimeSegmentManager.class.getName(), this);
 
     // Setup change listeners for already existing tables, if any.
     processPropertyStoreChange(_tableConfigPath);
@@ -271,7 +273,7 @@ public class PinotRealtimeSegmentManager implements HelixPropertyListener, IZkCh
   }
 
   private boolean isLeader() {
-    return ControllerLeadershipManager.getInstance().isLeader();
+    return _controllerLeadershipManager.isLeader();
   }
 
   @Override
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index 7591898..618c4a6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -73,6 +73,7 @@ public class SegmentCompletionManager {
   private final Map<String, Long> _commitTimeMap = new ConcurrentHashMap<>();
   private final PinotLLCRealtimeSegmentManager _segmentManager;
   private final ControllerMetrics _controllerMetrics;
+  private final ControllerLeadershipManager _controllerLeadershipManager;
 
   // Half hour max commit time for all segments
   private static final int MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS = 1800;
@@ -84,10 +85,11 @@ public class SegmentCompletionManager {
   // TODO keep some history of past committed segments so that we can avoid looking up PROPERTYSTORE if some server comes in late.
 
   protected SegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager,
-      ControllerMetrics controllerMetrics) {
+      ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
     _helixManager = helixManager;
     _segmentManager = segmentManager;
     _controllerMetrics = controllerMetrics;
+    _controllerLeadershipManager = controllerLeadershipManager;
   }
 
   public boolean isSplitCommitEnabled() {
@@ -100,11 +102,11 @@ public class SegmentCompletionManager {
 
   public static SegmentCompletionManager create(HelixManager helixManager,
       PinotLLCRealtimeSegmentManager segmentManager, ControllerConf controllerConf,
-      ControllerMetrics controllerMetrics) {
+      ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
     if (_instance != null) {
       throw new RuntimeException("Cannot create multiple instances");
     }
-    _instance = new SegmentCompletionManager(helixManager, segmentManager, controllerMetrics);
+    _instance = new SegmentCompletionManager(helixManager, segmentManager, controllerMetrics, controllerLeadershipManager);
     SegmentCompletionProtocol.setMaxSegmentCommitTimeMs(
         TimeUnit.MILLISECONDS.convert(controllerConf.getSegmentCommitTimeoutSeconds(), TimeUnit.SECONDS));
     return _instance;
@@ -1122,6 +1124,6 @@ public class SegmentCompletionManager {
 
   @VisibleForTesting
   protected boolean isLeader() {
-    return ControllerLeadershipManager.getInstance().isLeader();
+    return _controllerLeadershipManager.isLeader();
   }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
index 64c78ae..156ad9f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
@@ -48,13 +48,16 @@ public class StorageQuotaChecker {
   private final TableConfig _tableConfig;
   private final ControllerMetrics _controllerMetrics;
   private final PinotHelixResourceManager _pinotHelixResourceManager;
+  private final ControllerLeadershipManager _controllerLeadershipManager;
 
   public StorageQuotaChecker(TableConfig tableConfig, TableSizeReader tableSizeReader,
-      ControllerMetrics controllerMetrics, PinotHelixResourceManager pinotHelixResourceManager) {
+      ControllerMetrics controllerMetrics, PinotHelixResourceManager pinotHelixResourceManager,
+      ControllerLeadershipManager controllerLeadershipManager) {
     _tableConfig = tableConfig;
     _tableSizeReader = tableSizeReader;
     _controllerMetrics = controllerMetrics;
     _pinotHelixResourceManager = pinotHelixResourceManager;
+    _controllerLeadershipManager = controllerLeadershipManager;
   }
 
   public static class QuotaCheckerResponse {
@@ -213,6 +216,6 @@ public class StorageQuotaChecker {
   }
 
   protected boolean isLeader() {
-    return ControllerLeadershipManager.getInstance().isLeader();
+    return _controllerLeadershipManager.isLeader();
   }
 }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 8153559..4f881ff 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -56,6 +56,7 @@ import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.ControllerLeadershipManager;
 import org.apache.pinot.controller.api.resources.LLCSegmentCompletionHandlers;
 import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
 import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
@@ -557,8 +558,9 @@ public class PinotLLCRealtimeSegmentManagerTest {
                   System.currentTimeMillis());
           CommittingSegmentDescriptor committingSegmentDescriptor =
               new CommittingSegmentDescriptor(latestSegment.getSegmentName(), latestMetadata.getStartOffset() + 100, 0);
-          segmentManager.updateOldSegmentMetadataZNRecord(tableName, latestSegment,
-                  latestMetadata.getStartOffset() + 100, committingSegmentDescriptor);
+          segmentManager
+              .updateOldSegmentMetadataZNRecord(tableName, latestSegment, latestMetadata.getStartOffset() + 100,
+                  committingSegmentDescriptor);
           segmentManager.createNewSegmentMetadataZNRecord(tableConfig, latestSegment, newLlcSegmentName,
               expectedPartitionAssignment, committingSegmentDescriptor, false);
 
@@ -633,7 +635,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
                 segmentManager.getRealtimeSegmentZKMetadata(tableName, latestSegment.getSegmentName(), null);
             segmentManager
                 .updateOldSegmentMetadataZNRecord(tableName, latestSegment, latestMetadata.getStartOffset() + 100,
-                        new CommittingSegmentDescriptor(latestSegment.getSegmentName(), latestMetadata.getStartOffset() + 100, 0));
+                    new CommittingSegmentDescriptor(latestSegment.getSegmentName(),
+                        latestMetadata.getStartOffset() + 100, 0));
             idealState = idealStateBuilder.setSegmentState(latestSegment.getSegmentName(), "ONLINE").build();
 
             // get old state
@@ -668,7 +671,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
                   segmentManager.getRealtimeSegmentZKMetadata(tableName, latestSegment.getSegmentName(), null);
               segmentManager
                   .updateOldSegmentMetadataZNRecord(tableName, latestSegment, latestMetadata.getStartOffset() + 100,
-                          new CommittingSegmentDescriptor(latestSegment.getSegmentName(), latestMetadata.getStartOffset() + 100, 0));
+                      new CommittingSegmentDescriptor(latestSegment.getSegmentName(),
+                          latestMetadata.getStartOffset() + 100, 0));
 
               // get old state
               nPartitions = expectedPartitionAssignment.getNumPartitions();
@@ -895,9 +899,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
     segmentManager._records.clear();
     segmentManager.IS_CONNECTED = false;
     reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
-    CommittingSegmentDescriptor committingSegmentDescriptor =
-        CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams,
-                segmentManager.newMockSegmentMetadata());
+    CommittingSegmentDescriptor committingSegmentDescriptor = CommittingSegmentDescriptor
+        .fromSegmentCompletionReqParamsAndMetadata(reqParams, segmentManager.newMockSegmentMetadata());
     boolean status = segmentManager.commitSegmentMetadata(rawTableName, committingSegmentDescriptor);
     Assert.assertFalse(status);
     Assert.assertEquals(segmentManager._nCallsToUpdateHelix, 0);  // Idealstate not updated
@@ -952,8 +955,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
     segmentManager._records.clear();
     Set<String> prevInstances = idealState.getInstanceSet(committingSegmentMetadata.getSegmentName());
     reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
-    CommittingSegmentDescriptor committingSegmentDescriptor =
-        CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams, segmentManager.newMockSegmentMetadata());
+    CommittingSegmentDescriptor committingSegmentDescriptor = CommittingSegmentDescriptor
+        .fromSegmentCompletionReqParamsAndMetadata(reqParams, segmentManager.newMockSegmentMetadata());
     boolean status = segmentManager.commitSegmentMetadata(rawTableName, committingSegmentDescriptor);
     segmentManager.verifyMetadataInteractions();
     Assert.assertTrue(status);
@@ -976,8 +979,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
     segmentManager._records.clear();
     prevInstances = idealState.getInstanceSet(committingSegmentMetadata.getSegmentName());
     reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
-    committingSegmentDescriptor = CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams,
-            segmentManager.newMockSegmentMetadata());
+    committingSegmentDescriptor = CommittingSegmentDescriptor
+        .fromSegmentCompletionReqParamsAndMetadata(reqParams, segmentManager.newMockSegmentMetadata());
     status = segmentManager.commitSegmentMetadata(rawTableName, committingSegmentDescriptor);
     segmentManager.verifyMetadataInteractions();
     Assert.assertTrue(status);
@@ -996,8 +999,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
     segmentManager._records.clear();
     reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
     // We do not expect the segment metadata to be used. Thus reuse the current metadata.
-    committingSegmentDescriptor = CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams,
-            segmentManager.getMockSegmentMetadata());
+    committingSegmentDescriptor = CommittingSegmentDescriptor
+        .fromSegmentCompletionReqParamsAndMetadata(reqParams, segmentManager.getMockSegmentMetadata());
     status = segmentManager.commitSegmentMetadata(rawTableName, committingSegmentDescriptor);
     segmentManager.verifyMetadataInteractions();
     Assert.assertFalse(status);
@@ -1013,8 +1016,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
     segmentManager._records.clear();
     reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
     // We do not expect the segment metadata to be used. Thus reuse the current metadata.
-    committingSegmentDescriptor = CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams,
-            segmentManager.getMockSegmentMetadata());
+    committingSegmentDescriptor = CommittingSegmentDescriptor
+        .fromSegmentCompletionReqParamsAndMetadata(reqParams, segmentManager.getMockSegmentMetadata());
     status = segmentManager.commitSegmentMetadata(rawTableName, committingSegmentDescriptor);
     segmentManager.verifyMetadataInteractions();
     Assert.assertFalse(status);
@@ -1029,8 +1032,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
     committingSegmentMetadata = new LLCRealtimeSegmentZKMetadata(newZnRec);
     prevInstances = idealState.getInstanceSet(committingSegmentMetadata.getSegmentName());
     reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
-    committingSegmentDescriptor = CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams,
-            segmentManager.newMockSegmentMetadata());
+    committingSegmentDescriptor = CommittingSegmentDescriptor
+        .fromSegmentCompletionReqParamsAndMetadata(reqParams, segmentManager.newMockSegmentMetadata());
     status = segmentManager.commitSegmentMetadata(rawTableName, committingSegmentDescriptor);
     segmentManager.verifyMetadataInteractions();
     Assert.assertTrue(status);
@@ -1099,9 +1102,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
         new LLCRealtimeSegmentZKMetadata(segmentManager2._records.get(committingPartition));
 
     reqParams.withSegmentName(committingSegmentMetadata.getSegmentName()).withOffset(nextOffset);
-    CommittingSegmentDescriptor committingSegmentDescriptor =
-        CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(reqParams,
-                segmentManager1.newMockSegmentMetadata());
+    CommittingSegmentDescriptor committingSegmentDescriptor = CommittingSegmentDescriptor
+        .fromSegmentCompletionReqParamsAndMetadata(reqParams, segmentManager1.newMockSegmentMetadata());
     boolean status = segmentManager1.commitSegmentMetadata(rawTableName, committingSegmentDescriptor);
     Assert.assertTrue(status);  // Committing segment metadata succeeded.
 
@@ -1119,7 +1121,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
    * @throws InvalidConfigException
    */
   @Test
-  public void testIdealStateAlreadyUpdated() throws InvalidConfigException {
+  public void testIdealStateAlreadyUpdated()
+      throws InvalidConfigException {
     FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager(null);
     String tableNameWithType = "tableName_REALTIME";
     String rawTableName = "tableName";
@@ -1130,9 +1133,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
     IdealState idealState = segmentManager._tableIdealState;
     TableConfig tableConfig = segmentManager._tableConfigStore.getTableConfig(tableNameWithType);
-    PartitionAssignment partitionAssignment =
-        segmentManager._partitionAssignmentGenerator.getStreamPartitionAssignmentFromIdealState(tableConfig,
-            idealState);
+    PartitionAssignment partitionAssignment = segmentManager._partitionAssignmentGenerator
+        .getStreamPartitionAssignmentFromIdealState(tableConfig, idealState);
     int partitionId = 0;
     int seq = 0;
     IdealStateBuilderUtil idealStateBuilderUtil = new IdealStateBuilderUtil(idealState, tableNameWithType);
@@ -1165,7 +1167,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
     Assert.assertEquals(idealState, idealStateCopy);
   }
 
-  private void setupSegmentManager(FakePinotLLCRealtimeSegmentManager segmentManager, String rtTableName, int nPartitions, int nReplicas, int nInstances)
+  private void setupSegmentManager(FakePinotLLCRealtimeSegmentManager segmentManager, String rtTableName,
+      int nPartitions, int nReplicas, int nInstances)
       throws InvalidConfigException {
 
     List<String> instances = getInstanceList(nInstances);
@@ -1338,9 +1341,9 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
     private TableConfigStore _tableConfigStore;
 
-    protected FakePinotLLCRealtimeSegmentManager(List<String> existingLLCSegments) {
-
-      super(null, clusterName, null, null, null, CONTROLLER_CONF, new ControllerMetrics(new MetricsRegistry()));
+    protected FakePinotLLCRealtimeSegmentManager(List<String> existingLLCSegments, HelixManager helixManager) {
+      super(null, clusterName, helixManager, null, null, CONTROLLER_CONF, new ControllerMetrics(new MetricsRegistry()),
+          new ControllerLeadershipManager(helixManager));
       try {
         TableConfigCache mockCache = mock(TableConfigCache.class);
         TableConfig mockTableConfig = mock(TableConfig.class);
@@ -1375,6 +1378,10 @@ public class PinotLLCRealtimeSegmentManagerTest {
       _tableConfigStore = new TableConfigStore();
     }
 
+    protected FakePinotLLCRealtimeSegmentManager(List<String> existingLLCSegments) {
+      this(existingLLCSegments, mock(HelixManager.class));
+    }
+
     private SegmentMetadataImpl newMockSegmentMetadata() {
       segmentMetadata = mock(SegmentMetadataImpl.class);
       when(segmentMetadata.getCrc()).thenReturn(FakePinotLLCRealtimeSegmentManager.CRC);
@@ -1480,7 +1487,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
           partitionAssignment, committingSegmentDescriptor, isNewTableSetup);
     }
 
-
     @Override
     public LLCRealtimeSegmentZKMetadata getRealtimeSegmentZKMetadata(String realtimeTableName, String segmentName,
         Stat stat) {
@@ -1500,7 +1506,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
       return metadata;
     }
 
-
     public void verifyMetadataInteractions() {
       verify(segmentMetadata, times(1)).getCrc();
       verify(segmentMetadata, times(2)).getTimeInterval();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index 3b8503e..0dfe6b4 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -30,6 +30,7 @@ import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.ControllerLeadershipManager;
 import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
 import org.apache.zookeeper.data.Stat;
 import org.testng.Assert;
@@ -64,7 +65,8 @@ public class SegmentCompletionTest {
 
   public void testCaseSetup(boolean isLeader, boolean isConnected)
       throws Exception {
-    segmentManager = new MockPinotLLCRealtimeSegmentManager();
+    segmentManager = new MockPinotLLCRealtimeSegmentManager(isLeader, isConnected);
+    ControllerLeadershipManager controllerLeadershipManager = segmentManager.getControllerLeadershipManager();
     final int partitionId = 23;
     final int seqId = 12;
     final long now = System.currentTimeMillis();
@@ -76,7 +78,8 @@ public class SegmentCompletionTest {
     metadata.setNumReplicas(3);
     segmentManager._segmentMetadata = metadata;
 
-    segmentCompletionMgr = new MockSegmentCompletionManager(segmentManager, isLeader, isConnected);
+    segmentCompletionMgr =
+        new MockSegmentCompletionManager(segmentManager, isLeader, isConnected);
     segmentManager._segmentCompletionMgr = segmentCompletionMgr;
 
     Field fsmMapField = SegmentCompletionManager.class.getDeclaredField("_fsmMap");
@@ -151,8 +154,8 @@ public class SegmentCompletionTest {
 
     segmentCompletionMgr._secconds += 5;
     params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
-    response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
-            CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+    response = segmentCompletionMgr
+        .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
 
     // Now the FSM should have disappeared from the map
@@ -221,8 +224,8 @@ public class SegmentCompletionTest {
 
     segmentCompletionMgr._secconds += 5;
     params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
-    response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
-            CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+    response = segmentCompletionMgr
+        .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
 
     // Now the FSM should have disappeared from the map
@@ -332,8 +335,8 @@ public class SegmentCompletionTest {
     segmentCompletionMgr._secconds += 5;
     params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
         .withSegmentLocation("doNotCommitMe");
-    response = segmentCompletionMgr.segmentCommitEnd(params, true, true,
-            CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+    response = segmentCompletionMgr
+        .segmentCommitEnd(params, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.FAILED);
 
     // Now the FSM should have aborted
@@ -366,8 +369,8 @@ public class SegmentCompletionTest {
     segmentCompletionMgr._secconds += 5;
     params = new Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr)
         .withSegmentLocation("location");
-    response = segmentCompletionMgr.segmentCommitEnd(params, true, true,
-            CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+    response = segmentCompletionMgr
+        .segmentCommitEnd(params, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), ControllerResponseStatus.COMMIT_SUCCESS);
     // And the FSM should be removed.
     Assert.assertFalse(fsmMap.containsKey(segmentNameStr));
@@ -419,8 +422,8 @@ public class SegmentCompletionTest {
     segmentCompletionMgr._secconds += 5;
     params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
         .withSegmentLocation("location");
-    response = segmentCompletionMgr.segmentCommitEnd(params, true, true,
-            CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+    response = segmentCompletionMgr
+        .segmentCommitEnd(params, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
 
     // Now the FSM should have disappeared from the map
@@ -469,8 +472,8 @@ public class SegmentCompletionTest {
     segmentCompletionMgr._secconds += 5;
     params = new Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr)
         .withSegmentLocation("location");
-    response = segmentCompletionMgr.segmentCommitEnd(params, true, true,
-            CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+    response = segmentCompletionMgr
+        .segmentCommitEnd(params, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.FAILED);
 
     // Now the FSM should have disappeared from the map
@@ -529,8 +532,8 @@ public class SegmentCompletionTest {
 
     segmentCompletionMgr._secconds += 5;
     params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
-    response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
-            CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+    response = segmentCompletionMgr
+        .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
 
     // Now the FSM should have disappeared from the map
@@ -602,8 +605,8 @@ public class SegmentCompletionTest {
 
     segmentCompletionMgr._secconds += 5;
     params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
-    response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
-            CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+    response = segmentCompletionMgr
+        .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
     // We ask S2 to keep the segment
     params = new Request.Params().withInstanceId(s2).withOffset(s1Offset).withSegmentName(segmentNameStr)
@@ -670,7 +673,7 @@ public class SegmentCompletionTest {
     params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
         .withSegmentLocation("location");
     response = segmentCompletionMgr.segmentCommitEnd(params, true, isSplitCommit,
-            CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+        CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
     // Now the FSM should have disappeared from the map
     Assert.assertFalse(fsmMap.containsKey(segmentNameStr));
@@ -864,8 +867,8 @@ public class SegmentCompletionTest {
     long commitTimeMs = (segmentCompletionMgr._secconds - startTime) * 1000;
     Assert.assertEquals(commitTimeMap.get(tableName).longValue(), commitTimeMs);
     segmentCompletionMgr._secconds += 55;
-    response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
-            CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+    response = segmentCompletionMgr
+        .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
     // now FSM should be out of the map.
     Assert.assertFalse((fsmMap.containsKey(segmentNameStr)));
@@ -1137,9 +1140,11 @@ public class SegmentCompletionTest {
     private static final ControllerConf CONTROLLER_CONF = new ControllerConf();
     public LLCSegmentName _stoppedSegmentName;
     public String _stoppedInstance;
+    public HelixManager _helixManager = mock(HelixManager.class);
 
-    protected MockPinotLLCRealtimeSegmentManager() {
-      super(null, clusterName, null, null, null, CONTROLLER_CONF, new ControllerMetrics(new MetricsRegistry()));
+    protected MockPinotLLCRealtimeSegmentManager(boolean isLeader, boolean isConnected) {
+      super(null, clusterName, null, null, null, CONTROLLER_CONF, new ControllerMetrics(new MetricsRegistry()),
+          new ControllerLeadershipManager(createMockHelixManager(isLeader, isConnected)));
     }
 
     @Override
@@ -1186,8 +1191,18 @@ public class SegmentCompletionTest {
 
     protected MockSegmentCompletionManager(PinotLLCRealtimeSegmentManager segmentManager, boolean isLeader,
         boolean isConnected) {
-      super(createMockHelixManager(isLeader, isConnected), segmentManager,
-          new ControllerMetrics(new MetricsRegistry()));
+      this(createMockHelixManager(isLeader, isConnected), segmentManager, isLeader, isConnected);
+    }
+
+    protected MockSegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager, boolean isLeader,
+        boolean isConnected) {
+      this(helixManager, segmentManager, isLeader, isConnected, new ControllerLeadershipManager(helixManager));
+    }
+
+    protected MockSegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager,
+        boolean isLeader, boolean isConnected, ControllerLeadershipManager controllerLeadershipManager) {
+      super(helixManager, segmentManager, new ControllerMetrics(new MetricsRegistry()),
+          controllerLeadershipManager);
       _isLeader = isLeader;
     }
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
index f69a42c..ed38a50 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
@@ -30,6 +30,7 @@ import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerLeadershipManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.util.TableSizeReader;
 import org.mockito.invocation.InvocationOnMock;
@@ -48,6 +49,7 @@ public class StorageQuotaCheckerTest {
   private TableConfig _tableConfig;
   private ControllerMetrics _controllerMetrics;
   private PinotHelixResourceManager _pinotHelixResourceManager;
+  private ControllerLeadershipManager _controllerLeadershipManager;
   private QuotaConfig _quotaConfig;
   private SegmentsValidationAndRetentionConfig _validationConfig;
   private static final File TEST_DIR = new File(StorageQuotaCheckerTest.class.getName());
@@ -60,6 +62,7 @@ public class StorageQuotaCheckerTest {
     _controllerMetrics = new ControllerMetrics(new MetricsRegistry());
     _validationConfig = mock(SegmentsValidationAndRetentionConfig.class);
     _pinotHelixResourceManager = mock(PinotHelixResourceManager.class);
+    _controllerLeadershipManager = mock(ControllerLeadershipManager.class);
     when(_tableConfig.getValidationConfig()).thenReturn(_validationConfig);
     when(_validationConfig.getReplicationNumber()).thenReturn(2);
     TEST_DIR.mkdirs();
@@ -74,7 +77,8 @@ public class StorageQuotaCheckerTest {
   public void testNoQuota()
       throws InvalidConfigException {
     StorageQuotaChecker checker =
-        new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
+        new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager,
+            _controllerLeadershipManager);
     when(_tableConfig.getQuotaConfig()).thenReturn(null);
     StorageQuotaChecker.QuotaCheckerResponse res =
         checker.isSegmentStorageWithinQuota(TEST_DIR, "myTable", "segment", 1000);
@@ -85,7 +89,8 @@ public class StorageQuotaCheckerTest {
   public void testNoStorageQuotaConfig()
       throws InvalidConfigException {
     StorageQuotaChecker checker =
-        new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
+        new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager,
+            _controllerLeadershipManager);
     when(_tableConfig.getQuotaConfig()).thenReturn(_quotaConfig);
     when(_quotaConfig.storageSizeBytes()).thenReturn(-1L);
     StorageQuotaChecker.QuotaCheckerResponse res =
@@ -128,7 +133,8 @@ public class StorageQuotaCheckerTest {
     when(_quotaConfig.storageSizeBytes()).thenReturn(3000L);
     when(_quotaConfig.getStorage()).thenReturn("3K");
     StorageQuotaChecker checker =
-        new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
+        new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager,
+            _controllerLeadershipManager);
     StorageQuotaChecker.QuotaCheckerResponse response =
         checker.isSegmentStorageWithinQuota(TEST_DIR, tableName, "segment1", 1000);
     Assert.assertTrue(response.isSegmentWithinQuota);
@@ -177,8 +183,9 @@ public class StorageQuotaCheckerTest {
   private class MockStorageQuotaChecker extends StorageQuotaChecker {
 
     public MockStorageQuotaChecker(TableConfig tableConfig, TableSizeReader tableSizeReader,
-        ControllerMetrics controllerMetrics, PinotHelixResourceManager pinotHelixResourceManager) {
-      super(tableConfig, tableSizeReader, controllerMetrics, pinotHelixResourceManager);
+        ControllerMetrics controllerMetrics, PinotHelixResourceManager pinotHelixResourceManager,
+        ControllerLeadershipManager controllerLeadershipManager) {
+      super(tableConfig, tableSizeReader, controllerMetrics, pinotHelixResourceManager, controllerLeadershipManager);
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org