You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/04/18 21:10:00 UTC

[incubator-pinot] branch master updated: Remove singleton for PinotLLCRealtimeSegmentManager and SegmentCompletionManager (#4102)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 25c3686  Remove singleton for PinotLLCRealtimeSegmentManager and SegmentCompletionManager (#4102)
25c3686 is described below

commit 25c36869999ad12560ca3fa0a9c1319cfba5183c
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Thu Apr 18 14:09:56 2019 -0700

    Remove singleton for PinotLLCRealtimeSegmentManager and SegmentCompletionManager (#4102)
---
 .../apache/pinot/controller/ControllerStarter.java | 31 ++++++---
 .../resources/LLCSegmentCompletionHandlers.java    | 19 +++---
 .../helix/core/PinotHelixResourceManager.java      | 23 ++++---
 .../helix/core/PinotTableIdealStateBuilder.java    |  8 +--
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 73 +++++++---------------
 .../core/realtime/SegmentCompletionManager.java    | 33 ++--------
 .../rebalance/RebalanceSegmentStrategyFactory.java | 27 +-------
 .../controller/helix/PinotControllerModeTest.java  | 11 ++++
 .../PinotLLCRealtimeSegmentManagerTest.java        | 61 +++++++++++-------
 .../helix/core/realtime/SegmentCompletionTest.java | 31 ++++-----
 10 files changed, 147 insertions(+), 170 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index 4a900c6..fd3169c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -56,6 +56,7 @@ import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTaskScheduler;
 import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import org.apache.pinot.controller.helix.core.realtime.PinotRealtimeSegmentManager;
+import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategyFactory;
 import org.apache.pinot.controller.helix.core.relocation.RealtimeSegmentRelocator;
 import org.apache.pinot.controller.helix.core.retention.RetentionManager;
@@ -108,6 +109,8 @@ public class ControllerStarter {
   private ControllerPeriodicTaskScheduler _controllerPeriodicTaskScheduler;
   private PinotHelixTaskResourceManager _helixTaskResourceManager;
   private PinotRealtimeSegmentManager _realtimeSegmentsManager;
+  private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
+  private SegmentCompletionManager _segmentCompletionManager;
   private ControllerLeadershipManager _controllerLeadershipManager;
   private List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbackList;
 
@@ -259,8 +262,16 @@ public class ControllerStarter {
 
     // Helix resource manager must be started in order to create PinotLLCRealtimeSegmentManager
     LOGGER.info("Starting realtime segment manager");
-    PinotLLCRealtimeSegmentManager
-        .create(_helixResourceManager, _config, _controllerMetrics, _controllerLeadershipManager);
+
+    _pinotLLCRealtimeSegmentManager =
+        new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config, _controllerMetrics,
+            _controllerLeadershipManager);
+    // TODO: Need to put this inside HelixResourceManager when ControllerLeadershipManager is removed.
+    _helixResourceManager.registerPinotLLCRealtimeSegmentManager(_pinotLLCRealtimeSegmentManager);
+    _segmentCompletionManager =
+        new SegmentCompletionManager(helixParticipantManager, _pinotLLCRealtimeSegmentManager, _controllerMetrics,
+            _controllerLeadershipManager, _config.getSegmentCommitTimeoutSeconds());
+
     _realtimeSegmentsManager = new PinotRealtimeSegmentManager(_helixResourceManager, _controllerLeadershipManager);
     _realtimeSegmentsManager.start(_controllerMetrics);
 
@@ -270,8 +281,9 @@ public class ControllerStarter {
     _controllerPeriodicTaskScheduler = new ControllerPeriodicTaskScheduler();
     _controllerPeriodicTaskScheduler.init(controllerPeriodicTasks, _controllerLeadershipManager);
 
-    LOGGER.info("Creating rebalance segments factory");
-    RebalanceSegmentStrategyFactory.createInstance(helixParticipantManager);
+    LOGGER.info("Registering rebalance segments factory");
+    _helixResourceManager
+        .registerRebalanceSegmentStrategyFactory(new RebalanceSegmentStrategyFactory(helixParticipantManager));
 
     String accessControlFactoryClass = _config.getAccessControlFactoryClass();
     LOGGER.info("Use class: {} as the AccessControlFactory", accessControlFactoryClass);
@@ -298,6 +310,7 @@ public class ControllerStarter {
         bind(_config).to(ControllerConf.class);
         bind(_helixResourceManager).to(PinotHelixResourceManager.class);
         bind(_helixTaskResourceManager).to(PinotHelixTaskResourceManager.class);
+        bind(_segmentCompletionManager).to(SegmentCompletionManager.class);
         bind(_taskManager).to(PinotTaskManager.class);
         bind(connectionManager).to(HttpConnectionManager.class);
         bind(_executorService).to(Executor.class);
@@ -418,8 +431,9 @@ public class ControllerStarter {
         new OfflineSegmentIntervalChecker(_config, _helixResourceManager, new ValidationMetrics(_metricsRegistry),
             _controllerMetrics);
     periodicTasks.add(_offlineSegmentIntervalChecker);
-    _realtimeSegmentValidationManager = new RealtimeSegmentValidationManager(_config, _helixResourceManager,
-        PinotLLCRealtimeSegmentManager.getInstance(), new ValidationMetrics(_metricsRegistry), _controllerMetrics);
+    _realtimeSegmentValidationManager =
+        new RealtimeSegmentValidationManager(_config, _helixResourceManager, _pinotLLCRealtimeSegmentManager,
+            new ValidationMetrics(_metricsRegistry), _controllerMetrics);
     periodicTasks.add(_realtimeSegmentValidationManager);
     _brokerResourceValidationManager =
         new BrokerResourceValidationManager(_config, _helixResourceManager, _controllerMetrics);
@@ -460,7 +474,7 @@ public class ControllerStarter {
 
       // Stop PinotLLCSegmentManager before stopping Jersey API. It is possible that stopping Jersey API
       // may interrupt the handlers waiting on an I/O.
-      PinotLLCRealtimeSegmentManager.getInstance().stop();
+      _pinotLLCRealtimeSegmentManager.stop();
 
       LOGGER.info("Closing PinotFS classes");
       PinotFSFactory.shutdown();
@@ -474,9 +488,6 @@ public class ControllerStarter {
       LOGGER.info("Stopping resource manager");
       _helixResourceManager.stop();
 
-      LOGGER.info("Stopping rebalance segments factory");
-      RebalanceSegmentStrategyFactory.stop();
-
       _executorService.shutdownNow();
     } catch (final Exception e) {
       LOGGER.error("Caught exception while shutting down", e);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
index 19e2123..bbd70b3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
@@ -75,6 +75,9 @@ public class LLCSegmentCompletionHandlers {
   @Inject
   ControllerConf _controllerConf;
 
+  @Inject
+  SegmentCompletionManager _segmentCompletionManager;
+
   @VisibleForTesting
   public static String getScheme() {
     return SCHEME;
@@ -104,7 +107,7 @@ public class LLCSegmentCompletionHandlers {
         .withExtraTimeSec(extraTimeSec);
     LOGGER.info("Processing extendBuildTime:{}", requestParams.toString());
 
-    SegmentCompletionProtocol.Response response = SegmentCompletionManager.getInstance().extendBuildTime(requestParams);
+    SegmentCompletionProtocol.Response response = _segmentCompletionManager.extendBuildTime(requestParams);
 
     final String responseStr = response.toJsonString();
     LOGGER.info("Response to extendBuildTime:{}", responseStr);
@@ -130,7 +133,7 @@ public class LLCSegmentCompletionHandlers {
         .withMemoryUsedBytes(memoryUsedBytes).withNumRows(numRows);
     LOGGER.info("Processing segmentConsumed:{}", requestParams.toString());
 
-    SegmentCompletionProtocol.Response response = SegmentCompletionManager.getInstance().segmentConsumed(requestParams);
+    SegmentCompletionProtocol.Response response = _segmentCompletionManager.segmentConsumed(requestParams);
     final String responseStr = response.toJsonString();
     LOGGER.info("Response to segmentConsumed:{}", responseStr);
     return responseStr;
@@ -153,7 +156,7 @@ public class LLCSegmentCompletionHandlers {
     LOGGER.info("Processing segmentStoppedConsuming:{}", requestParams.toString());
 
     SegmentCompletionProtocol.Response response =
-        SegmentCompletionManager.getInstance().segmentStoppedConsuming(requestParams);
+        _segmentCompletionManager.segmentStoppedConsuming(requestParams);
     final String responseStr = response.toJsonString();
     LOGGER.info("Response to segmentStoppedConsuming:{}", responseStr);
     return responseStr;
@@ -183,7 +186,7 @@ public class LLCSegmentCompletionHandlers {
     LOGGER.info("Processing segmentCommitStart:{}", requestParams.toString());
 
     SegmentCompletionProtocol.Response response =
-        SegmentCompletionManager.getInstance().segmentCommitStart(requestParams);
+        _segmentCompletionManager.segmentCommitStart(requestParams);
     final String responseStr = response.toJsonString();
     LOGGER.info("Response to segmentCommitStart:{}", responseStr);
     return responseStr;
@@ -230,7 +233,7 @@ public class LLCSegmentCompletionHandlers {
 
     CommittingSegmentDescriptor committingSegmentDescriptor =
         CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams, segmentMetadata);
-    SegmentCompletionProtocol.Response response = SegmentCompletionManager.getInstance()
+    SegmentCompletionProtocol.Response response = _segmentCompletionManager
         .segmentCommitEnd(requestParams, isSuccess, isSplitCommit, committingSegmentDescriptor);
     final String responseStr = response.toJsonString();
     LOGGER.info("Response to segmentCommitEnd:{}", responseStr);
@@ -255,7 +258,7 @@ public class LLCSegmentCompletionHandlers {
         .withNumRows(numRows).withMemoryUsedBytes(memoryUsedBytes);
     LOGGER.info("Processing segmentCommit:{}", requestParams.toString());
 
-    final SegmentCompletionManager segmentCompletionManager = SegmentCompletionManager.getInstance();
+    final SegmentCompletionManager segmentCompletionManager = _segmentCompletionManager;
     SegmentCompletionProtocol.Response response = segmentCompletionManager.segmentCommitStart(requestParams);
 
     CommittingSegmentDescriptor committingSegmentDescriptor =
@@ -297,7 +300,7 @@ public class LLCSegmentCompletionHandlers {
               // check for existing segment file and remove it. So, the block cannot be removed altogether.
               // For now, we live with these corner cases. Once we have split-commit enabled and working, this code will no longer
               // be used.
-              synchronized (SegmentCompletionManager.getInstance()) {
+              synchronized (_segmentCompletionManager) {
                 if (pinotFS.exists(segmentFileURI)) {
                   LOGGER.warn("Segment file {} exists. Replacing with upload from {} for segment {}",
                       segmentFileURI.toString(), instanceId, segmentName);
@@ -406,7 +409,7 @@ public class LLCSegmentCompletionHandlers {
       LOGGER.error("Segment metadata extraction failure for segment {}", segmentName);
       return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
     }
-    SegmentCompletionProtocol.Response response = SegmentCompletionManager.getInstance()
+    SegmentCompletionProtocol.Response response = _segmentCompletionManager
         .segmentCommitEnd(requestParams, isSuccess, isSplitCommit,
             CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams, segmentMetadata));
     final String responseStr = response.toJsonString();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index a13fab3..ef94094 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -138,6 +138,8 @@ public class PinotHelixResourceManager {
   private ZkCacheBaseDataAccessor<ZNRecord> _cacheInstanceConfigsDataAccessor;
   private Builder _keyBuilder;
   private SegmentDeletionManager _segmentDeletionManager;
+  private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
+  private RebalanceSegmentStrategyFactory _rebalanceSegmentStrategyFactory;
   private TableRebalancer _tableRebalancer;
 
   public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String helixClusterName,
@@ -1131,13 +1133,11 @@ public class PinotHelixResourceManager {
   @VisibleForTesting
   protected void validateTableTenantConfig(TableConfig tableConfig, String tableNameWithType, TableType tableType) {
     if (tableConfig == null) {
-      throw new InvalidTableConfigException(
-          "Table config is null for table: " + tableNameWithType);
+      throw new InvalidTableConfigException("Table config is null for table: " + tableNameWithType);
     }
     TenantConfig tenantConfig = tableConfig.getTenantConfig();
     if (tenantConfig == null || tenantConfig.getBroker() == null || tenantConfig.getServer() == null) {
-      throw new InvalidTableConfigException(
-          "Tenant is not configured for table: " + tableNameWithType);
+      throw new InvalidTableConfigException("Tenant is not configured for table: " + tableNameWithType);
     }
     // Check if tenant exists before creating the table
     String brokerTenantName = TagNameUtils.getBrokerTagForTenant(tenantConfig.getBroker());
@@ -1236,6 +1236,14 @@ public class PinotHelixResourceManager {
     }
   }
 
+  public void registerPinotLLCRealtimeSegmentManager(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager) {
+    _pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager;
+  }
+
+  public void registerRebalanceSegmentStrategyFactory(RebalanceSegmentStrategyFactory rebalanceSegmentStrategyFactory) {
+    _rebalanceSegmentStrategyFactory = rebalanceSegmentStrategyFactory;
+  }
+
   private void ensureRealtimeClusterIsSetUp(TableConfig config, String realtimeTableName,
       IndexingConfig indexingConfig) {
     StreamConfig streamConfig = new StreamConfig(indexingConfig.getStreamConfigs());
@@ -1255,7 +1263,7 @@ public class PinotHelixResourceManager {
         // Only high-level consumer specified in the config.
         createHelixEntriesForHighLevelConsumer(config, realtimeTableName, idealState);
         // Clean up any LLC table if they are present
-        PinotLLCRealtimeSegmentManager.getInstance().cleanupLLC(realtimeTableName);
+        _pinotLLCRealtimeSegmentManager.cleanupLLC(realtimeTableName);
       }
     }
 
@@ -1265,7 +1273,8 @@ public class PinotHelixResourceManager {
       // (unless there are low-level segments already present)
       if (ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, realtimeTableName).isEmpty()) {
         PinotTableIdealStateBuilder
-            .buildLowLevelRealtimeIdealStateFor(realtimeTableName, config, idealState, _enableBatchMessageMode);
+            .buildLowLevelRealtimeIdealStateFor(_pinotLLCRealtimeSegmentManager, realtimeTableName, config, idealState,
+                _enableBatchMessageMode);
         LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName);
       } else {
         LOGGER.info("LLC is already set up for table {}, not configuring again", realtimeTableName);
@@ -2234,7 +2243,7 @@ public class PinotHelixResourceManager {
     RebalanceResult result;
     try {
       RebalanceSegmentStrategy rebalanceSegmentsStrategy =
-          RebalanceSegmentStrategyFactory.getInstance().getRebalanceSegmentsStrategy(tableConfig);
+          _rebalanceSegmentStrategyFactory.getRebalanceSegmentsStrategy(tableConfig);
       result = _tableRebalancer.rebalance(tableConfig, rebalanceSegmentsStrategy, rebalanceUserConfig);
     } catch (InvalidConfigException e) {
       LOGGER.error("Exception in rebalancing config for table {}", tableNameWithType, e);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index 0e311e6..46eb8e4 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -118,8 +118,9 @@ public class PinotTableIdealStateBuilder {
     return idealState;
   }
 
-  public static void buildLowLevelRealtimeIdealStateFor(String realtimeTableName, TableConfig realtimeTableConfig,
-      IdealState idealState, boolean enableBatchMessageMode) {
+  public static void buildLowLevelRealtimeIdealStateFor(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager,
+      String realtimeTableName, TableConfig realtimeTableConfig, IdealState idealState,
+      boolean enableBatchMessageMode) {
 
     // Validate replicasPerPartition here.
     final String replicasPerPartitionStr = realtimeTableConfig.getValidationConfig().getReplicasPerPartition();
@@ -136,9 +137,8 @@ public class PinotTableIdealStateBuilder {
     if (idealState == null) {
       idealState = buildEmptyRealtimeIdealStateFor(realtimeTableName, nReplicas, enableBatchMessageMode);
     }
-    final PinotLLCRealtimeSegmentManager segmentManager = PinotLLCRealtimeSegmentManager.getInstance();
     try {
-      segmentManager.setupNewTable(realtimeTableConfig, idealState);
+      pinotLLCRealtimeSegmentManager.setupNewTable(realtimeTableConfig, idealState);
     } catch (InvalidConfigException e) {
       throw new IllegalStateException("Caught exception when creating table " + realtimeTableName, e);
     }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 324b9ae..c833261 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -111,9 +111,6 @@ public class PinotLLCRealtimeSegmentManager {
    */
   private static int MAX_SEGMENT_COMPLETION_TIME_MILLIS = 300_000; // 5 MINUTES
 
-  // TODO: fix the misuse of singleton.
-  private static PinotLLCRealtimeSegmentManager INSTANCE = null;
-
   private final HelixAdmin _helixAdmin;
   private final HelixManager _helixManager;
   private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
@@ -131,6 +128,27 @@ public class PinotLLCRealtimeSegmentManager {
   private volatile boolean _isStopping = false;
   private AtomicInteger _numCompletingSegments = new AtomicInteger(0);
 
+  public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
+      ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
+    _helixAdmin = helixResourceManager.getHelixAdmin();
+    _helixManager = helixResourceManager.getHelixZkManager();
+    _propertyStore = helixResourceManager.getPropertyStore();
+    _helixResourceManager = helixResourceManager;
+    _clusterName = helixResourceManager.getHelixClusterName();
+    _controllerConf = controllerConf;
+    _controllerMetrics = controllerMetrics;
+    _numIdealStateUpdateLocks = controllerConf.getRealtimeSegmentMetadataCommitNumLocks();
+    _idealstateUpdateLocks = new Lock[_numIdealStateUpdateLocks];
+    for (int i = 0; i < _numIdealStateUpdateLocks; i++) {
+      _idealstateUpdateLocks[i] = new ReentrantLock();
+    }
+    _tableConfigCache = new TableConfigCache(_propertyStore);
+    _streamPartitionAssignmentGenerator = new StreamPartitionAssignmentGenerator(_helixManager);
+    _flushThresholdUpdateManager = new FlushThresholdUpdateManager();
+    _controllerLeadershipManager = controllerLeadershipManager;
+  }
+
+
   public boolean getIsSplitCommitEnabled() {
     return _controllerConf.getAcceptSplitCommit();
   }
@@ -139,25 +157,6 @@ public class PinotLLCRealtimeSegmentManager {
     return _controllerConf.generateVipUrl();
   }
 
-  public static synchronized void create(PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
-      ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
-    create(helixResourceManager.getHelixAdmin(), helixResourceManager.getHelixClusterName(),
-        helixResourceManager.getHelixZkManager(), helixResourceManager.getPropertyStore(), helixResourceManager,
-        controllerConf, controllerMetrics, controllerLeadershipManager);
-  }
-
-  private static synchronized void create(HelixAdmin helixAdmin, String clusterName, HelixManager helixManager,
-      ZkHelixPropertyStore propertyStore, PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
-      ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
-    if (INSTANCE != null) {
-      throw new RuntimeException("Instance already created");
-    }
-    INSTANCE =
-        new PinotLLCRealtimeSegmentManager(helixAdmin, clusterName, helixManager, propertyStore, helixResourceManager,
-            controllerConf, controllerMetrics, controllerLeadershipManager);
-    SegmentCompletionManager.create(helixManager, INSTANCE, controllerConf, controllerMetrics, controllerLeadershipManager);
-  }
-
   public void stop() {
     _isStopping = true;
     LOGGER
@@ -181,36 +180,6 @@ public class PinotLLCRealtimeSegmentManager {
       }
     }
     LOGGER.info("Wait completed: Number of completing segments = {}", _numCompletingSegments.get());
-    INSTANCE = null;
-    SegmentCompletionManager.stop();
-  }
-
-  protected PinotLLCRealtimeSegmentManager(HelixAdmin helixAdmin, String clusterName, HelixManager helixManager,
-      ZkHelixPropertyStore propertyStore, PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
-      ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
-    _helixAdmin = helixAdmin;
-    _helixManager = helixManager;
-    _propertyStore = propertyStore;
-    _helixResourceManager = helixResourceManager;
-    _clusterName = clusterName;
-    _controllerConf = controllerConf;
-    _controllerMetrics = controllerMetrics;
-    _numIdealStateUpdateLocks = controllerConf.getRealtimeSegmentMetadataCommitNumLocks();
-    _idealstateUpdateLocks = new Lock[_numIdealStateUpdateLocks];
-    for (int i = 0; i < _numIdealStateUpdateLocks; i++) {
-      _idealstateUpdateLocks[i] = new ReentrantLock();
-    }
-    _tableConfigCache = new TableConfigCache(_propertyStore);
-    _streamPartitionAssignmentGenerator = new StreamPartitionAssignmentGenerator(_helixManager);
-    _flushThresholdUpdateManager = new FlushThresholdUpdateManager();
-    _controllerLeadershipManager = controllerLeadershipManager;
-  }
-
-  public static PinotLLCRealtimeSegmentManager getInstance() {
-    if (INSTANCE == null) {
-      throw new RuntimeException("Not yet created");
-    }
-    return INSTANCE;
   }
 
   protected boolean isLeader() {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index 618c4a6..cfdd3c3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -64,9 +64,6 @@ public class SegmentCompletionManager {
     ABORTED,      // state machine is aborted. we will start a fresh one when the next segmentConsumed comes in.
   }
 
-  // TODO: fix the misuse of singleton.
-  private static SegmentCompletionManager _instance = null;
-
   private final HelixManager _helixManager;
   // A map that holds the FSM for each segment.
   private final Map<String, SegmentCompletionFSM> _fsmMap = new ConcurrentHashMap<>();
@@ -84,12 +81,15 @@ public class SegmentCompletionManager {
 
   // TODO keep some history of past committed segments so that we can avoid looking up PROPERTYSTORE if some server comes in late.
 
-  protected SegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager,
-      ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
+  public SegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager,
+      ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager,
+      int segmentCommitTimeoutSeconds) {
     _helixManager = helixManager;
     _segmentManager = segmentManager;
     _controllerMetrics = controllerMetrics;
     _controllerLeadershipManager = controllerLeadershipManager;
+    SegmentCompletionProtocol
+        .setMaxSegmentCommitTimeMs(TimeUnit.MILLISECONDS.convert(segmentCommitTimeoutSeconds, TimeUnit.SECONDS));
   }
 
   public boolean isSplitCommitEnabled() {
@@ -100,25 +100,6 @@ public class SegmentCompletionManager {
     return _segmentManager.getControllerVipUrl();
   }
 
-  public static SegmentCompletionManager create(HelixManager helixManager,
-      PinotLLCRealtimeSegmentManager segmentManager, ControllerConf controllerConf,
-      ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
-    if (_instance != null) {
-      throw new RuntimeException("Cannot create multiple instances");
-    }
-    _instance = new SegmentCompletionManager(helixManager, segmentManager, controllerMetrics, controllerLeadershipManager);
-    SegmentCompletionProtocol.setMaxSegmentCommitTimeMs(
-        TimeUnit.MILLISECONDS.convert(controllerConf.getSegmentCommitTimeoutSeconds(), TimeUnit.SECONDS));
-    return _instance;
-  }
-
-  public static SegmentCompletionManager getInstance() {
-    if (_instance == null) {
-      throw new RuntimeException("Not yet created");
-    }
-    return _instance;
-  }
-
   protected long getCurrentTimeMs() {
     return System.currentTimeMillis();
   }
@@ -1118,10 +1099,6 @@ public class SegmentCompletionManager {
     }
   }
 
-  public static void stop() {
-    _instance = null;
-  }
-
   @VisibleForTesting
   protected boolean isLeader() {
     return _controllerLeadershipManager.isLeader();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSegmentStrategyFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSegmentStrategyFactory.java
index 30ac8af..7fe6ae8 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSegmentStrategyFactory.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSegmentStrategyFactory.java
@@ -24,35 +24,18 @@ import org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategy
 
 
 /**
- * Singleton factory class, to fetch the right rebalance segments strategy based on table config
+ * This class is used to fetch the right rebalance segments strategy based on table config.
  */
 public class RebalanceSegmentStrategyFactory {
 
-  // TODO: fix the misuse of singleton.
-  private static RebalanceSegmentStrategyFactory INSTANCE = null;
-
   private HelixManager _helixManager;
 
-  private RebalanceSegmentStrategyFactory(HelixManager helixManager) {
+  public RebalanceSegmentStrategyFactory(HelixManager helixManager) {
     _helixManager = helixManager;
   }
 
-  public static void createInstance(HelixManager helixManager) {
-    if (INSTANCE != null) {
-      throw new RuntimeException("Instance already created for " + RebalanceSegmentStrategyFactory.class.getName());
-    }
-    INSTANCE = new RebalanceSegmentStrategyFactory(helixManager);
-  }
-
-  public static RebalanceSegmentStrategyFactory getInstance() {
-    if (INSTANCE == null) {
-      throw new RuntimeException("Instance not yet created for " + RebalanceSegmentStrategyFactory.class.getName());
-    }
-    return INSTANCE;
-  }
-
   public RebalanceSegmentStrategy getRebalanceSegmentsStrategy(TableConfig tableConfig) {
-    // If we use replica group segment assignment strategy, we pick the replica group rebalancer
+    // If we use replica group segment assignment strategy, we pick the replica group rebalancer.
     String segmentAssignmentStrategy = tableConfig.getValidationConfig().getSegmentAssignmentStrategy();
     if (segmentAssignmentStrategy == null) {
       return new DefaultRebalanceSegmentStrategy(_helixManager);
@@ -64,8 +47,4 @@ public class RebalanceSegmentStrategyFactory {
         return new DefaultRebalanceSegmentStrategy(_helixManager);
     }
   }
-
-  public static void stop() {
-    INSTANCE = null;
-  }
 }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
index 9bf70bd..4de3034 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
@@ -103,6 +103,17 @@ public class PinotControllerModeTest extends ControllerTest {
         "Failed to start " + config.getControllerMode() + " controller in " + TIMEOUT_IN_MS + "ms.");
     Assert.assertEquals(_controllerStarter.getControllerMode(), ControllerConf.ControllerMode.PINOT_ONLY);
 
+    // Start a second Pinot only controller.
+    config.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++));
+    ControllerStarter secondControllerStarter = new TestOnlyControllerStarter(config);
+
+    secondControllerStarter.start();
+    // Two controller instances assigned to cluster.
+    TestUtils.waitForCondition(aVoid -> _helixResourceManager.getAllInstances().size() == 2, TIMEOUT_IN_MS,
+        "Failed to start the 2nd pinot only controller in " + TIMEOUT_IN_MS + "ms.");
+
+    secondControllerStarter.stop();
+
     stopController();
     _controllerStarter = null;
     helixControllerStarter.stop();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 4f881ff..fd860c3 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -58,6 +58,7 @@ import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.ControllerLeadershipManager;
 import org.apache.pinot.controller.api.resources.LLCSegmentCompletionHandlers;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
 import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
 import org.apache.pinot.controller.util.SegmentCompletionUtils;
@@ -89,6 +90,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
   private static File baseDir;
   private Random random;
 
+  private PinotHelixResourceManager _mockPinotHelixResourceManager;
+
   private enum ExternalChange {
     N_INSTANCES_CHANGED, N_PARTITIONS_INCREASED, N_INSTANCES_CHANGED_AND_PARTITIONS_INCREASED
   }
@@ -119,6 +122,10 @@ public class PinotLLCRealtimeSegmentManagerTest {
     }
     FakePinotLLCRealtimeSegmentManager.IS_CONNECTED = true;
     FakePinotLLCRealtimeSegmentManager.IS_LEADER = true;
+
+    _mockPinotHelixResourceManager = mock(PinotHelixResourceManager.class);
+    HelixManager mockHelixManager = mock(HelixManager.class);
+    when(_mockPinotHelixResourceManager.getHelixZkManager()).thenReturn(mockHelixManager);
   }
 
   @AfterTest
@@ -193,7 +200,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
   private void testSetupNewTable(TableConfig tableConfig, IdealState idealState, int nPartitions, int nReplicas,
       List<String> instances, boolean invalidConfig, boolean badStream) {
-    FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager(null);
+    FakePinotLLCRealtimeSegmentManager segmentManager =
+        new FakePinotLLCRealtimeSegmentManager(_mockPinotHelixResourceManager, null);
     segmentManager._partitionAssignmentGenerator.setConsumingInstances(instances);
     segmentManager.addTableToStore(tableConfig.getTableName(), tableConfig, nPartitions);
 
@@ -281,7 +289,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
   @Test
   public void testValidateLLCPartitionIncrease() {
 
-    FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager(null);
+    FakePinotLLCRealtimeSegmentManager segmentManager =
+        new FakePinotLLCRealtimeSegmentManager(_mockPinotHelixResourceManager, null);
     String tableName = "validateThisTable_REALTIME";
     int nReplicas = 2;
     IdealStateBuilderUtil idealStateBuilder = new IdealStateBuilderUtil(tableName);
@@ -470,7 +479,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
   public void testValidateLLCRepair()
       throws InvalidConfigException {
 
-    FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager(null);
+    FakePinotLLCRealtimeSegmentManager segmentManager =
+        new FakePinotLLCRealtimeSegmentManager(_mockPinotHelixResourceManager, null);
     String tableName = "repairThisTable_REALTIME";
     String rawTableName = TableNameBuilder.extractRawTableName(tableName);
     int nReplicas = 2;
@@ -855,7 +865,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     LLCSegmentName existingSegmentName = new LLCSegmentName("someTable", 1, 31, 12355L);
     String[] existingSegs = {existingSegmentName.getSegmentName()};
     FakePinotLLCRealtimeSegmentManager segmentManager =
-        new FakePinotLLCRealtimeSegmentManager(Arrays.asList(existingSegs));
+        new FakePinotLLCRealtimeSegmentManager(_mockPinotHelixResourceManager, Arrays.asList(existingSegs));
 
     final String rtTableName = "testPreExistingLLCSegments_REALTIME";
 
@@ -874,7 +884,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
   @Test
   public void testCommittingSegmentIfDisconnected()
       throws InvalidConfigException {
-    FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager(null);
+    FakePinotLLCRealtimeSegmentManager segmentManager =
+        new FakePinotLLCRealtimeSegmentManager(_mockPinotHelixResourceManager, null);
 
     final String tableName = "table_REALTIME";
     final String rawTableName = TableNameBuilder.extractRawTableName(tableName);
@@ -926,7 +937,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
   @Test
   public void testCommittingSegment()
       throws InvalidConfigException {
-    FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager(null);
+    FakePinotLLCRealtimeSegmentManager segmentManager =
+        new FakePinotLLCRealtimeSegmentManager(_mockPinotHelixResourceManager, null);
 
     final String rtTableName = "table_REALTIME";
     final String rawTableName = TableNameBuilder.extractRawTableName(rtTableName);
@@ -1083,11 +1095,14 @@ public class PinotLLCRealtimeSegmentManagerTest {
   public void testCommitSegmentWhenControllerWentThroughGC()
       throws InvalidConfigException {
 
-    FakePinotLLCRealtimeSegmentManager segmentManager1 = new FakePinotLLCRealtimeSegmentManager(null);
-    FakePinotLLCRealtimeSegmentManager segmentManager2 = new FakePinotLLCRealtimeSegmentManagerII(null,
-        FakePinotLLCRealtimeSegmentManagerII.SCENARIO_1_ZK_VERSION_NUM_HAS_CHANGE);
-    FakePinotLLCRealtimeSegmentManager segmentManager3 = new FakePinotLLCRealtimeSegmentManagerII(null,
-        FakePinotLLCRealtimeSegmentManagerII.SCENARIO_2_METADATA_STATUS_HAS_CHANGE);
+    FakePinotLLCRealtimeSegmentManager segmentManager1 =
+        new FakePinotLLCRealtimeSegmentManager(_mockPinotHelixResourceManager, null);
+    FakePinotLLCRealtimeSegmentManager segmentManager2 =
+        new FakePinotLLCRealtimeSegmentManagerII(_mockPinotHelixResourceManager, null,
+            FakePinotLLCRealtimeSegmentManagerII.SCENARIO_1_ZK_VERSION_NUM_HAS_CHANGE);
+    FakePinotLLCRealtimeSegmentManager segmentManager3 =
+        new FakePinotLLCRealtimeSegmentManagerII(_mockPinotHelixResourceManager, null,
+            FakePinotLLCRealtimeSegmentManagerII.SCENARIO_2_METADATA_STATUS_HAS_CHANGE);
 
     final String rtTableName = "table_REALTIME";
     final String rawTableName = TableNameBuilder.extractRawTableName(rtTableName);
@@ -1123,7 +1138,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
   @Test
   public void testIdealStateAlreadyUpdated()
       throws InvalidConfigException {
-    FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager(null);
+    FakePinotLLCRealtimeSegmentManager segmentManager =
+        new FakePinotLLCRealtimeSegmentManager(_mockPinotHelixResourceManager, null);
     String tableNameWithType = "tableName_REALTIME";
     String rawTableName = "tableName";
     int nPartitions = 4;
@@ -1199,7 +1215,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
       throws Exception {
     PinotFSFactory.init(new PropertiesConfiguration());
     PinotLLCRealtimeSegmentManager realtimeSegmentManager =
-        new FakePinotLLCRealtimeSegmentManager(Collections.<String>emptyList());
+        new FakePinotLLCRealtimeSegmentManager(_mockPinotHelixResourceManager, Collections.<String>emptyList());
     String tableName = "fakeTable_REALTIME";
     String segmentName = "segment";
     String temporarySegmentLocation = SegmentCompletionUtils.generateSegmentFileName(segmentName);
@@ -1219,7 +1235,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
       throws Exception {
     PinotFSFactory.init(new PropertiesConfiguration());
     PinotLLCRealtimeSegmentManager realtimeSegmentManager =
-        new FakePinotLLCRealtimeSegmentManager(Collections.<String>emptyList());
+        new FakePinotLLCRealtimeSegmentManager(_mockPinotHelixResourceManager, Collections.<String>emptyList());
     String tableName = "fakeTable_REALTIME";
     String segmentName = "segment";
 
@@ -1341,9 +1357,11 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
     private TableConfigStore _tableConfigStore;
 
-    protected FakePinotLLCRealtimeSegmentManager(List<String> existingLLCSegments, HelixManager helixManager) {
-      super(null, clusterName, helixManager, null, null, CONTROLLER_CONF, new ControllerMetrics(new MetricsRegistry()),
-          new ControllerLeadershipManager(helixManager));
+    protected FakePinotLLCRealtimeSegmentManager(PinotHelixResourceManager pinotHelixResourceManager,
+        List<String> existingLLCSegments) {
+      super(pinotHelixResourceManager, CONTROLLER_CONF, new ControllerMetrics(new MetricsRegistry()),
+          new ControllerLeadershipManager(pinotHelixResourceManager.getHelixZkManager()));
+
       try {
         TableConfigCache mockCache = mock(TableConfigCache.class);
         TableConfig mockTableConfig = mock(TableConfig.class);
@@ -1378,10 +1396,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
       _tableConfigStore = new TableConfigStore();
     }
 
-    protected FakePinotLLCRealtimeSegmentManager(List<String> existingLLCSegments) {
-      this(existingLLCSegments, mock(HelixManager.class));
-    }
-
     private SegmentMetadataImpl newMockSegmentMetadata() {
       segmentMetadata = mock(SegmentMetadataImpl.class);
       when(segmentMetadata.getCrc()).thenReturn(FakePinotLLCRealtimeSegmentManager.CRC);
@@ -1574,8 +1588,9 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
     private int _scenario;
 
-    FakePinotLLCRealtimeSegmentManagerII(List<String> existingLLCSegments, int scenario) {
-      super(existingLLCSegments);
+    FakePinotLLCRealtimeSegmentManagerII(PinotHelixResourceManager pinotHelixResourceManager,
+        List<String> existingLLCSegments, int scenario) {
+      super(pinotHelixResourceManager, existingLLCSegments);
       _scenario = scenario;
     }
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index 0dfe6b4..e13fc87 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -31,6 +31,7 @@ import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
 import org.apache.zookeeper.data.Stat;
 import org.testng.Assert;
@@ -39,6 +40,7 @@ import org.testng.annotations.Test;
 
 import static org.apache.pinot.common.protocols.SegmentCompletionProtocol.ControllerResponseStatus;
 import static org.apache.pinot.common.protocols.SegmentCompletionProtocol.Request;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -65,8 +67,10 @@ public class SegmentCompletionTest {
 
   public void testCaseSetup(boolean isLeader, boolean isConnected)
       throws Exception {
-    segmentManager = new MockPinotLLCRealtimeSegmentManager(isLeader, isConnected);
-    ControllerLeadershipManager controllerLeadershipManager = segmentManager.getControllerLeadershipManager();
+    PinotHelixResourceManager mockPinotHelixResourceManager = mock(PinotHelixResourceManager.class);
+    HelixManager mockHelixManager = createMockHelixManager(isLeader, isConnected);
+    when(mockPinotHelixResourceManager.getHelixZkManager()).thenReturn(mockHelixManager);
+    segmentManager = new MockPinotLLCRealtimeSegmentManager(mockPinotHelixResourceManager);
     final int partitionId = 23;
     final int seqId = 12;
     final long now = System.currentTimeMillis();
@@ -78,8 +82,7 @@ public class SegmentCompletionTest {
     metadata.setNumReplicas(3);
     segmentManager._segmentMetadata = metadata;
 
-    segmentCompletionMgr =
-        new MockSegmentCompletionManager(segmentManager, isLeader, isConnected);
+    segmentCompletionMgr = new MockSegmentCompletionManager(segmentManager, isLeader, isConnected);
     segmentManager._segmentCompletionMgr = segmentCompletionMgr;
 
     Field fsmMapField = SegmentCompletionManager.class.getDeclaredField("_fsmMap");
@@ -1142,9 +1145,9 @@ public class SegmentCompletionTest {
     public String _stoppedInstance;
     public HelixManager _helixManager = mock(HelixManager.class);
 
-    protected MockPinotLLCRealtimeSegmentManager(boolean isLeader, boolean isConnected) {
-      super(null, clusterName, null, null, null, CONTROLLER_CONF, new ControllerMetrics(new MetricsRegistry()),
-          new ControllerLeadershipManager(createMockHelixManager(isLeader, isConnected)));
+    protected MockPinotLLCRealtimeSegmentManager(PinotHelixResourceManager pinotHelixResourceManager) {
+      super(pinotHelixResourceManager, CONTROLLER_CONF, new ControllerMetrics(new MetricsRegistry()),
+          new ControllerLeadershipManager(pinotHelixResourceManager.getHelixZkManager()));
     }
 
     @Override
@@ -1191,18 +1194,18 @@ public class SegmentCompletionTest {
 
     protected MockSegmentCompletionManager(PinotLLCRealtimeSegmentManager segmentManager, boolean isLeader,
         boolean isConnected) {
-      this(createMockHelixManager(isLeader, isConnected), segmentManager, isLeader, isConnected);
+      this(createMockHelixManager(isLeader, isConnected), segmentManager, isLeader);
     }
 
-    protected MockSegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager, boolean isLeader,
-        boolean isConnected) {
-      this(helixManager, segmentManager, isLeader, isConnected, new ControllerLeadershipManager(helixManager));
+    protected MockSegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager,
+        boolean isLeader) {
+      this(helixManager, segmentManager, isLeader, new ControllerLeadershipManager(helixManager));
     }
 
     protected MockSegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager,
-        boolean isLeader, boolean isConnected, ControllerLeadershipManager controllerLeadershipManager) {
-      super(helixManager, segmentManager, new ControllerMetrics(new MetricsRegistry()),
-          controllerLeadershipManager);
+        boolean isLeader, ControllerLeadershipManager controllerLeadershipManager) {
+      super(helixManager, segmentManager, new ControllerMetrics(new MetricsRegistry()), controllerLeadershipManager,
+          SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds());
       _isLeader = isLeader;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org