You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/04/10 23:19:36 UTC

[incubator-pinot] 01/01: Remove singleton for PinotLLCRealtimeSegmentManager and SegmentCompletionManager

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch clean-up-singleton
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 4b5df6ccd1ccd4febb19f8601cc78ae3c6fe5130
Author: jackjlli <jl...@linkedin.com>
AuthorDate: Wed Apr 10 16:16:51 2019 -0700

    Remove singleton for PinotLLCRealtimeSegmentManager and SegmentCompletionManager
---
 .../apache/pinot/controller/ControllerStarter.java |  51 +++--
 .../resources/LLCSegmentCompletionHandlers.java    |  19 +-
 .../helix/core/PinotHelixResourceManager.java      | 124 ++++++------
 .../helix/core/PinotTableIdealStateBuilder.java    |  27 +--
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  28 +--
 .../core/realtime/SegmentCompletionManager.java    |  71 +++----
 .../controller/helix/PinotControllerModeTest.java  |   9 +
 .../helix/core/realtime/SegmentCompletionTest.java | 214 +++++++++++----------
 8 files changed, 265 insertions(+), 278 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index 10c1c00..0e59158 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -55,6 +55,7 @@ import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
 import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTaskScheduler;
 import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import org.apache.pinot.controller.helix.core.realtime.PinotRealtimeSegmentManager;
+import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategyFactory;
 import org.apache.pinot.controller.helix.core.relocation.RealtimeSegmentRelocator;
 import org.apache.pinot.controller.helix.core.retention.RetentionManager;
@@ -107,6 +108,8 @@ public class ControllerStarter {
   private ControllerPeriodicTaskScheduler _controllerPeriodicTaskScheduler;
   private PinotHelixTaskResourceManager _helixTaskResourceManager;
   private PinotRealtimeSegmentManager _realtimeSegmentsManager;
+  private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
+  private SegmentCompletionManager _segmentCompletionManager;
   private ControllerLeadershipManager _controllerLeadershipManager;
   private List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbackList;
 
@@ -191,15 +194,16 @@ public class ControllerStarter {
         LOGGER.error("Invalid mode: " + _controllerMode);
     }
 
-    ServiceStatus.setServiceStatusCallback(new ServiceStatus.MultipleCallbackServiceStatusCallback(_serviceStatusCallbackList));
+    ServiceStatus.setServiceStatusCallback(
+        new ServiceStatus.MultipleCallbackServiceStatusCallback(_serviceStatusCallbackList));
     _controllerMetrics.initializeGlobalMeters();
   }
 
   private void setUpHelixController() {
     // Register and connect instance as Helix controller.
     LOGGER.info("Starting Helix controller");
-    _helixControllerManager = HelixSetupUtils
-        .setup(_helixClusterName, _helixZkURL, _instanceId, _isUpdateStateModel, _enableBatchMessageMode);
+    _helixControllerManager = HelixSetupUtils.setup(_helixClusterName, _helixZkURL, _instanceId, _isUpdateStateModel,
+        _enableBatchMessageMode);
 
     // Emit helix controller metrics
     _controllerMetrics.addCallbackGauge("helix.connected", () -> _helixControllerManager.isConnected() ? 1L : 0L);
@@ -241,7 +245,15 @@ public class ControllerStarter {
 
     // Helix resource manager must be started in order to create PinotLLCRealtimeSegmentManager
     LOGGER.info("Starting realtime segment manager");
-    PinotLLCRealtimeSegmentManager.create(_helixResourceManager, _config, _controllerMetrics, _controllerLeadershipManager);
+
+    _pinotLLCRealtimeSegmentManager =
+        new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config, _controllerMetrics,
+            _controllerLeadershipManager);
+    // TODO: Need to put this inside HelixResourceManager when ControllerLeadershipManager is removed.
+    _helixResourceManager.setPinotLLCRealtimeSegmentManager(_pinotLLCRealtimeSegmentManager);
+    _segmentCompletionManager = new SegmentCompletionManager(helixParticipantManager, _pinotLLCRealtimeSegmentManager, _controllerMetrics, _config.getSegmentCommitTimeoutSeconds(), _controllerLeadershipManager);
+
+
     _realtimeSegmentsManager = new PinotRealtimeSegmentManager(_helixResourceManager, _controllerLeadershipManager);
     _realtimeSegmentsManager.start(_controllerMetrics);
 
@@ -279,6 +291,7 @@ public class ControllerStarter {
         bind(_config).to(ControllerConf.class);
         bind(_helixResourceManager).to(PinotHelixResourceManager.class);
         bind(_helixTaskResourceManager).to(PinotHelixTaskResourceManager.class);
+        bind(_segmentCompletionManager).to(SegmentCompletionManager.class);
         bind(_taskManager).to(PinotTaskManager.class);
         bind(connectionManager).to(HttpConnectionManager.class);
         bind(_executorService).to(Executor.class);
@@ -400,7 +413,7 @@ public class ControllerStarter {
             _controllerMetrics);
     periodicTasks.add(_offlineSegmentIntervalChecker);
     _realtimeSegmentValidationManager = new RealtimeSegmentValidationManager(_config, _helixResourceManager,
-        PinotLLCRealtimeSegmentManager.getInstance(), new ValidationMetrics(_metricsRegistry), _controllerMetrics);
+        _pinotLLCRealtimeSegmentManager, new ValidationMetrics(_metricsRegistry), _controllerMetrics);
     periodicTasks.add(_realtimeSegmentValidationManager);
     _brokerResourceValidationManager =
         new BrokerResourceValidationManager(_config, _helixResourceManager, _controllerMetrics);
@@ -414,18 +427,18 @@ public class ControllerStarter {
   }
 
   public void stop() {
-      switch (_controllerMode) {
-        case DUAL:
-          stopPinotController();
-          stopHelixController();
-          break;
-        case PINOT_ONLY:
-          stopPinotController();
-          break;
-        case HELIX_ONLY:
-          stopHelixController();
-          break;
-      }
+    switch (_controllerMode) {
+      case DUAL:
+        stopPinotController();
+        stopHelixController();
+        break;
+      case PINOT_ONLY:
+        stopPinotController();
+        break;
+      case HELIX_ONLY:
+        stopHelixController();
+        break;
+    }
   }
 
   private void stopHelixController() {
@@ -441,7 +454,7 @@ public class ControllerStarter {
 
       // Stop PinotLLCSegmentManager before stopping Jersey API. It is possible that stopping Jersey API
       // may interrupt the handlers waiting on an I/O.
-      PinotLLCRealtimeSegmentManager.getInstance().stop();
+      _pinotLLCRealtimeSegmentManager.stop();
 
       LOGGER.info("Closing PinotFS classes");
       PinotFSFactory.shutdown();
@@ -465,7 +478,7 @@ public class ControllerStarter {
   }
 
   public boolean isPinotOnlyModeSupported() {
-    return false;
+    return true;
   }
 
   public MetricsRegistry getMetricsRegistry() {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
index 19e2123..bbd70b3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
@@ -75,6 +75,9 @@ public class LLCSegmentCompletionHandlers {
   @Inject
   ControllerConf _controllerConf;
 
+  @Inject
+  SegmentCompletionManager _segmentCompletionManager;
+
   @VisibleForTesting
   public static String getScheme() {
     return SCHEME;
@@ -104,7 +107,7 @@ public class LLCSegmentCompletionHandlers {
         .withExtraTimeSec(extraTimeSec);
     LOGGER.info("Processing extendBuildTime:{}", requestParams.toString());
 
-    SegmentCompletionProtocol.Response response = SegmentCompletionManager.getInstance().extendBuildTime(requestParams);
+    SegmentCompletionProtocol.Response response = _segmentCompletionManager.extendBuildTime(requestParams);
 
     final String responseStr = response.toJsonString();
     LOGGER.info("Response to extendBuildTime:{}", responseStr);
@@ -130,7 +133,7 @@ public class LLCSegmentCompletionHandlers {
         .withMemoryUsedBytes(memoryUsedBytes).withNumRows(numRows);
     LOGGER.info("Processing segmentConsumed:{}", requestParams.toString());
 
-    SegmentCompletionProtocol.Response response = SegmentCompletionManager.getInstance().segmentConsumed(requestParams);
+    SegmentCompletionProtocol.Response response = _segmentCompletionManager.segmentConsumed(requestParams);
     final String responseStr = response.toJsonString();
     LOGGER.info("Response to segmentConsumed:{}", responseStr);
     return responseStr;
@@ -153,7 +156,7 @@ public class LLCSegmentCompletionHandlers {
     LOGGER.info("Processing segmentStoppedConsuming:{}", requestParams.toString());
 
     SegmentCompletionProtocol.Response response =
-        SegmentCompletionManager.getInstance().segmentStoppedConsuming(requestParams);
+        _segmentCompletionManager.segmentStoppedConsuming(requestParams);
     final String responseStr = response.toJsonString();
     LOGGER.info("Response to segmentStoppedConsuming:{}", responseStr);
     return responseStr;
@@ -183,7 +186,7 @@ public class LLCSegmentCompletionHandlers {
     LOGGER.info("Processing segmentCommitStart:{}", requestParams.toString());
 
     SegmentCompletionProtocol.Response response =
-        SegmentCompletionManager.getInstance().segmentCommitStart(requestParams);
+        _segmentCompletionManager.segmentCommitStart(requestParams);
     final String responseStr = response.toJsonString();
     LOGGER.info("Response to segmentCommitStart:{}", responseStr);
     return responseStr;
@@ -230,7 +233,7 @@ public class LLCSegmentCompletionHandlers {
 
     CommittingSegmentDescriptor committingSegmentDescriptor =
         CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams, segmentMetadata);
-    SegmentCompletionProtocol.Response response = SegmentCompletionManager.getInstance()
+    SegmentCompletionProtocol.Response response = _segmentCompletionManager
         .segmentCommitEnd(requestParams, isSuccess, isSplitCommit, committingSegmentDescriptor);
     final String responseStr = response.toJsonString();
     LOGGER.info("Response to segmentCommitEnd:{}", responseStr);
@@ -255,7 +258,7 @@ public class LLCSegmentCompletionHandlers {
         .withNumRows(numRows).withMemoryUsedBytes(memoryUsedBytes);
     LOGGER.info("Processing segmentCommit:{}", requestParams.toString());
 
-    final SegmentCompletionManager segmentCompletionManager = SegmentCompletionManager.getInstance();
+    final SegmentCompletionManager segmentCompletionManager = _segmentCompletionManager;
     SegmentCompletionProtocol.Response response = segmentCompletionManager.segmentCommitStart(requestParams);
 
     CommittingSegmentDescriptor committingSegmentDescriptor =
@@ -297,7 +300,7 @@ public class LLCSegmentCompletionHandlers {
               // check for existing segment file and remove it. So, the block cannot be removed altogether.
               // For now, we live with these corner cases. Once we have split-commit enabled and working, this code will no longer
               // be used.
-              synchronized (SegmentCompletionManager.getInstance()) {
+              synchronized (_segmentCompletionManager) {
                 if (pinotFS.exists(segmentFileURI)) {
                   LOGGER.warn("Segment file {} exists. Replacing with upload from {} for segment {}",
                       segmentFileURI.toString(), instanceId, segmentName);
@@ -406,7 +409,7 @@ public class LLCSegmentCompletionHandlers {
       LOGGER.error("Segment metadata extraction failure for segment {}", segmentName);
       return SegmentCompletionProtocol.RESP_FAILED.toJsonString();
     }
-    SegmentCompletionProtocol.Response response = SegmentCompletionManager.getInstance()
+    SegmentCompletionProtocol.Response response = _segmentCompletionManager
         .segmentCommitEnd(requestParams, isSuccess, isSplitCommit,
             CommittingSegmentDescriptor.fromSegmentCompletionReqParamsAndMetadata(requestParams, segmentMetadata));
     final String responseStr = response.toJsonString();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 1ad19fc..4216554 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -137,6 +137,7 @@ public class PinotHelixResourceManager {
   private ZkCacheBaseDataAccessor<ZNRecord> _cacheInstanceConfigsDataAccessor;
   private Builder _keyBuilder;
   private SegmentDeletionManager _segmentDeletionManager;
+  private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
   private TableRebalancer _tableRebalancer;
 
   public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String helixClusterName,
@@ -253,9 +254,8 @@ public class PinotHelixResourceManager {
    * Register and connect to Helix cluster as PARTICIPANT role.
    */
   private HelixManager registerAndConnectAsHelixParticipant() {
-    HelixManager helixManager = HelixManagerFactory
-        .getZKHelixManager(_helixClusterName, CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + _instanceId,
-            InstanceType.PARTICIPANT, _helixZkURL);
+    HelixManager helixManager = HelixManagerFactory.getZKHelixManager(_helixClusterName,
+        CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + _instanceId, InstanceType.PARTICIPANT, _helixZkURL);
     try {
       helixManager.connect();
       return helixManager;
@@ -525,8 +525,8 @@ public class PinotHelixResourceManager {
               // Segment is in error and we don't consider error state as different from target, therefore continue
             } else {
               // Will try to read data every 500 ms, only if external view not updated.
-              Uninterruptibles
-                  .sleepUninterruptibly(DEFAULT_EXTERNAL_VIEW_UPDATE_RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
+              Uninterruptibles.sleepUninterruptibly(DEFAULT_EXTERNAL_VIEW_UPDATE_RETRY_INTERVAL_MILLIS,
+                  TimeUnit.MILLISECONDS);
               continue deadlineLoop;
             }
           }
@@ -536,8 +536,8 @@ public class PinotHelixResourceManager {
         return true;
       } else {
         // Segment doesn't exist in EV, wait for a little bit
-        Uninterruptibles
-            .sleepUninterruptibly(DEFAULT_EXTERNAL_VIEW_UPDATE_RETRY_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
+        Uninterruptibles.sleepUninterruptibly(DEFAULT_EXTERNAL_VIEW_UPDATE_RETRY_INTERVAL_MILLIS,
+            TimeUnit.MILLISECONDS);
       }
     }
 
@@ -579,8 +579,7 @@ public class PinotHelixResourceManager {
     return PinotResourceManagerResponse.SUCCESS;
   }
 
-  public PinotResourceManagerResponse rebuildBrokerResourceFromHelixTags(String tableNameWithType)
-      throws Exception {
+  public PinotResourceManagerResponse rebuildBrokerResourceFromHelixTags(String tableNameWithType) throws Exception {
     TableConfig tableConfig;
     try {
       tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
@@ -602,8 +601,8 @@ public class PinotHelixResourceManager {
     IdealState brokerIdealState = HelixHelper.getBrokerIdealStates(_helixAdmin, _helixClusterName);
     Set<String> brokerInstancesInIdealState = brokerIdealState.getInstanceSet(tableNameWithType);
     if (brokerInstancesInIdealState.equals(brokerInstances)) {
-      return PinotResourceManagerResponse
-          .success("Broker resource is not rebuilt because ideal state is the same for table: " + tableNameWithType);
+      return PinotResourceManagerResponse.success(
+          "Broker resource is not rebuilt because ideal state is the same for table: " + tableNameWithType);
     }
 
     // Update ideal state with the new broker instances
@@ -639,8 +638,8 @@ public class PinotHelixResourceManager {
         tableIdealState.setPartitionState(tableNameWithType, instanceName, BrokerOnlineOfflineStateModel.ONLINE);
       }
     }
-    _helixAdmin
-        .setResourceIdealState(_helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, tableIdealState);
+    _helixAdmin.setResourceIdealState(_helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE,
+        tableIdealState);
   }
 
   private PinotResourceManagerResponse scaleDownBroker(Tenant tenant, String brokerTenantTag,
@@ -788,8 +787,8 @@ public class PinotHelixResourceManager {
   public boolean isServerTenantDeletable(String tenantName) {
     Set<String> taggedInstances = new HashSet<>(
         HelixHelper.getInstancesWithTag(_helixZkManager, TagNameUtils.getOfflineTagForTenant(tenantName)));
-    taggedInstances
-        .addAll(HelixHelper.getInstancesWithTag(_helixZkManager, TagNameUtils.getRealtimeTagForTenant(tenantName)));
+    taggedInstances.addAll(
+        HelixHelper.getInstancesWithTag(_helixZkManager, TagNameUtils.getRealtimeTagForTenant(tenantName)));
     for (String resourceName : getAllResources()) {
       if (!TableNameBuilder.isTableResource(resourceName)) {
         continue;
@@ -812,9 +811,8 @@ public class PinotHelixResourceManager {
     for (String instanceName : instancesInCluster) {
       InstanceConfig config = _helixDataAccessor.getProperty(_keyBuilder.instanceConfig(instanceName));
       for (String tag : config.getTags()) {
-        if (tag.equals(CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE) || tag
-            .equals(CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE) || tag
-            .equals(CommonConstants.Minion.UNTAGGED_INSTANCE)) {
+        if (tag.equals(CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE) || tag.equals(
+            CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE) || tag.equals(CommonConstants.Minion.UNTAGGED_INSTANCE)) {
           continue;
         }
         TenantRole tenantRole;
@@ -838,9 +836,8 @@ public class PinotHelixResourceManager {
     for (String instanceName : instancesInCluster) {
       InstanceConfig config = _helixDataAccessor.getProperty(_keyBuilder.instanceConfig(instanceName));
       for (String tag : config.getTags()) {
-        if (tag.equals(CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE) || tag
-            .equals(CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE) || tag
-            .equals(CommonConstants.Minion.UNTAGGED_INSTANCE)) {
+        if (tag.equals(CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE) || tag.equals(
+            CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE) || tag.equals(CommonConstants.Minion.UNTAGGED_INSTANCE)) {
           continue;
         }
         TenantRole tenantRole;
@@ -1027,9 +1024,8 @@ public class PinotHelixResourceManager {
   }
 
   public List<String> getSchemaNames() {
-    return _propertyStore
-        .getChildNames(PinotHelixPropertyStoreZnRecordProvider.forSchema(_propertyStore).getRelativePath(),
-            AccessOption.PERSISTENT);
+    return _propertyStore.getChildNames(
+        PinotHelixPropertyStoreZnRecordProvider.forSchema(_propertyStore).getRelativePath(), AccessOption.PERSISTENT);
   }
 
   /**
@@ -1037,8 +1033,7 @@ public class PinotHelixResourceManager {
    * @throws InvalidTableConfigException
    * @throws TableAlreadyExistsException for offline tables only if the table already exists
    */
-  public void addTable(@Nonnull TableConfig tableConfig)
-      throws IOException {
+  public void addTable(@Nonnull TableConfig tableConfig) throws IOException {
     final String tableNameWithType = tableConfig.getTableName();
 
     TenantConfig tenantConfig;
@@ -1107,9 +1102,8 @@ public class PinotHelixResourceManager {
         }
         // now lets build an ideal state
         LOGGER.info("building empty ideal state for table : " + tableNameWithType);
-        final IdealState offlineIdealState = PinotTableIdealStateBuilder
-            .buildEmptyIdealStateFor(tableNameWithType, Integer.parseInt(segmentsConfig.getReplication()),
-                _enableBatchMessageMode);
+        final IdealState offlineIdealState = PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType,
+            Integer.parseInt(segmentsConfig.getReplication()), _enableBatchMessageMode);
         LOGGER.info("adding table via the admin");
         _helixAdmin.addResource(_helixClusterName, tableNameWithType, offlineIdealState);
         LOGGER.info("successfully added the table : " + tableNameWithType + " to the cluster");
@@ -1215,6 +1209,10 @@ public class PinotHelixResourceManager {
     }
   }
 
+  public void setPinotLLCRealtimeSegmentManager(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager) {
+    _pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager;
+  }
+
   private void ensureRealtimeClusterIsSetUp(TableConfig config, String realtimeTableName,
       IndexingConfig indexingConfig) {
     StreamConfig streamConfig = new StreamConfig(indexingConfig.getStreamConfigs());
@@ -1234,7 +1232,7 @@ public class PinotHelixResourceManager {
         // Only high-level consumer specified in the config.
         createHelixEntriesForHighLevelConsumer(config, realtimeTableName, idealState);
         // Clean up any LLC table if they are present
-        PinotLLCRealtimeSegmentManager.getInstance().cleanupLLC(realtimeTableName);
+        _pinotLLCRealtimeSegmentManager.cleanupLLC(realtimeTableName);
       }
     }
 
@@ -1243,8 +1241,8 @@ public class PinotHelixResourceManager {
       // Will either create idealstate entry, or update the IS entry with new segments
       // (unless there are low-level segments already present)
       if (ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, realtimeTableName).isEmpty()) {
-        PinotTableIdealStateBuilder
-            .buildLowLevelRealtimeIdealStateFor(realtimeTableName, config, idealState, _enableBatchMessageMode);
+        PinotTableIdealStateBuilder.buildLowLevelRealtimeIdealStateFor(_pinotLLCRealtimeSegmentManager,
+            realtimeTableName, config, idealState, _enableBatchMessageMode);
         LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName);
       } else {
         LOGGER.info("LLC is already set up for table {}, not configuring again", realtimeTableName);
@@ -1255,9 +1253,8 @@ public class PinotHelixResourceManager {
   private void createHelixEntriesForHighLevelConsumer(TableConfig config, String realtimeTableName,
       IdealState idealState) {
     if (idealState == null) {
-      idealState = PinotTableIdealStateBuilder
-          .buildInitialHighLevelRealtimeIdealStateFor(realtimeTableName, config, _helixZkManager, _propertyStore,
-              _enableBatchMessageMode);
+      idealState = PinotTableIdealStateBuilder.buildInitialHighLevelRealtimeIdealStateFor(realtimeTableName, config,
+          _helixZkManager, _propertyStore, _enableBatchMessageMode);
       LOGGER.info("Adding helix resource with empty HLC IdealState for {}", realtimeTableName);
       _helixAdmin.addResource(_helixClusterName, realtimeTableName, idealState);
     } else {
@@ -1272,8 +1269,7 @@ public class PinotHelixResourceManager {
     }
   }
 
-  public void setExistingTableConfig(TableConfig config, String tableNameWithType, TableType type)
-      throws IOException {
+  public void setExistingTableConfig(TableConfig config, String tableNameWithType, TableType type) throws IOException {
     if (type == TableType.REALTIME) {
       ZKMetadataProvider.setRealtimeTableConfig(_propertyStore, tableNameWithType, config.toZNRecord());
       ensureRealtimeClusterIsSetUp(config, tableNameWithType, config.getIndexingConfig());
@@ -1284,7 +1280,8 @@ public class PinotHelixResourceManager {
       ZKMetadataProvider.setOfflineTableConfig(_propertyStore, tableNameWithType, config.toZNRecord());
       IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
       final String configReplication = config.getValidationConfig().getReplication();
-      if (configReplication != null && !config.getValidationConfig().getReplication()
+      if (configReplication != null && !config.getValidationConfig()
+          .getReplication()
           .equals(idealState.getReplicas())) {
         HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, new Function<IdealState, IdealState>() {
           @Nullable
@@ -1298,8 +1295,7 @@ public class PinotHelixResourceManager {
     }
   }
 
-  public void updateMetadataConfigFor(String tableName, TableType type, TableCustomConfig newConfigs)
-      throws Exception {
+  public void updateMetadataConfigFor(String tableName, TableType type, TableCustomConfig newConfigs) throws Exception {
     String tableNameWithType = TableNameBuilder.forType(type).tableNameWithType(tableName);
     TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
     if (tableConfig == null) {
@@ -1310,8 +1306,7 @@ public class PinotHelixResourceManager {
   }
 
   public void updateSegmentsValidationAndRetentionConfigFor(String tableName, TableType type,
-      SegmentsValidationAndRetentionConfig newConfigs)
-      throws Exception {
+      SegmentsValidationAndRetentionConfig newConfigs) throws Exception {
     String tableNameWithType = TableNameBuilder.forType(type).tableNameWithType(tableName);
     TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
     if (tableConfig == null) {
@@ -1321,8 +1316,7 @@ public class PinotHelixResourceManager {
     setExistingTableConfig(tableConfig, tableNameWithType, type);
   }
 
-  public void updateIndexingConfigFor(String tableName, TableType type, IndexingConfig newConfigs)
-      throws Exception {
+  public void updateIndexingConfigFor(String tableName, TableType type, IndexingConfig newConfigs) throws Exception {
     String tableNameWithType = TableNameBuilder.forType(type).tableNameWithType(tableName);
     TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
     if (tableConfig == null) {
@@ -1449,8 +1443,8 @@ public class PinotHelixResourceManager {
       }
     }
 
-    return (status) ? PinotResourceManagerResponse
-        .success("Table " + tableName + " enabled (reset success = " + resetSuccessful + ")")
+    return (status) ? PinotResourceManagerResponse.success(
+        "Table " + tableName + " enabled (reset success = " + resetSuccessful + ")")
         : PinotResourceManagerResponse.success("Table " + tableName + " disabled");
   }
 
@@ -1654,8 +1648,8 @@ public class PinotHelixResourceManager {
     if (customConfig != null) {
       Map<String, String> customConfigMap = customConfig.getCustomConfigs();
       if (customConfigMap != null) {
-        if (customConfigMap.containsKey(TableCustomConfig.MESSAGE_BASED_REFRESH_KEY) && !Boolean
-            .valueOf(customConfigMap.get(TableCustomConfig.MESSAGE_BASED_REFRESH_KEY))) {
+        if (customConfigMap.containsKey(TableCustomConfig.MESSAGE_BASED_REFRESH_KEY) && !Boolean.valueOf(
+            customConfigMap.get(TableCustomConfig.MESSAGE_BASED_REFRESH_KEY))) {
           return false;
         }
       }
@@ -1727,11 +1721,10 @@ public class PinotHelixResourceManager {
     Preconditions.checkNotNull(offlineTableConfig);
     int numReplicas = Integer.parseInt(offlineTableConfig.getValidationConfig().getReplication());
     String serverTenant = TagNameUtils.getOfflineTagForTenant(offlineTableConfig.getTenantConfig().getServer());
-    SegmentAssignmentStrategy segmentAssignmentStrategy = SegmentAssignmentStrategyFactory
-        .getSegmentAssignmentStrategy(offlineTableConfig.getValidationConfig().getSegmentAssignmentStrategy());
-    return segmentAssignmentStrategy
-        .getAssignedInstances(_helixZkManager, _helixAdmin, _propertyStore, _helixClusterName, segmentMetadata,
-            numReplicas, serverTenant);
+    SegmentAssignmentStrategy segmentAssignmentStrategy = SegmentAssignmentStrategyFactory.getSegmentAssignmentStrategy(
+        offlineTableConfig.getValidationConfig().getSegmentAssignmentStrategy());
+    return segmentAssignmentStrategy.getAssignedInstances(_helixZkManager, _helixAdmin, _propertyStore,
+        _helixClusterName, segmentMetadata, numReplicas, serverTenant);
   }
 
   /**
@@ -1796,9 +1789,9 @@ public class PinotHelixResourceManager {
     LOGGER.info("Wait until segment - " + segmentName + " to be OFFLINE in ExternalView");
     if (!ifExternalViewChangeReflectedForState(tableName, segmentName, "OFFLINE",
         _externalViewOnlineToOfflineTimeoutMillis, false)) {
-      LOGGER
-          .error("External view for segment {} did not reflect the ideal state of OFFLINE within the {} ms time limit",
-              segmentName, _externalViewOnlineToOfflineTimeoutMillis);
+      LOGGER.error(
+          "External view for segment {} did not reflect the ideal state of OFFLINE within the {} ms time limit",
+          segmentName, _externalViewOnlineToOfflineTimeoutMillis);
       return false;
     }
 
@@ -1817,8 +1810,8 @@ public class PinotHelixResourceManager {
     // Check that the ideal state has been written to ZK
     updatedIdealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableName);
     instanceStateMap = updatedIdealState.getInstanceStateMap(segmentName);
-    LOGGER
-        .info("Found {} instances for segment '{}', after updating ideal state", instanceStateMap.size(), segmentName);
+    LOGGER.info("Found {} instances for segment '{}', after updating ideal state", instanceStateMap.size(),
+        segmentName);
     for (String state : instanceStateMap.values()) {
       if (!"ONLINE".equals(state)) {
         LOGGER.error("Failed to write ONLINE ideal state!");
@@ -1966,8 +1959,8 @@ public class PinotHelixResourceManager {
       Map<String, String> instanceStateMap = updatedIdealState.getInstanceStateMap(segmentName);
       for (String state : instanceStateMap.values()) {
         if (!status.equals(state)) {
-          return PinotResourceManagerResponse
-              .failure("Failed to update ideal state when setting status " + status + " for segment " + segmentName);
+          return PinotResourceManagerResponse.failure(
+              "Failed to update ideal state when setting status " + status + " for segment " + segmentName);
         }
       }
 
@@ -1977,8 +1970,8 @@ public class PinotHelixResourceManager {
       }
     }
 
-    return (externalViewUpdateSuccessful) ? PinotResourceManagerResponse
-        .success("Segments " + segments + " now " + status)
+    return (externalViewUpdateSuccessful) ? PinotResourceManagerResponse.success(
+        "Segments " + segments + " now " + status)
         : PinotResourceManagerResponse.failure("Timed out. External view not completely updated");
   }
 
@@ -2106,8 +2099,8 @@ public class PinotHelixResourceManager {
       IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, resource);
       for (String partition : idealState.getPartitionSet()) {
         if (idealState.getInstanceSet(partition).contains(instanceName)) {
-          return PinotResourceManagerResponse
-              .failure("Instance " + instanceName + " exists in ideal state for " + resource);
+          return PinotResourceManagerResponse.failure(
+              "Instance " + instanceName + " exists in ideal state for " + resource);
         }
       }
     }
@@ -2186,8 +2179,7 @@ public class PinotHelixResourceManager {
 
   @Nonnull
   public RebalanceResult rebalanceTable(final String rawTableName, TableType tableType,
-      Configuration rebalanceUserConfig)
-      throws InvalidConfigException, TableNotFoundException {
+      Configuration rebalanceUserConfig) throws InvalidConfigException, TableNotFoundException {
 
     TableConfig tableConfig = getTableConfig(rawTableName, tableType);
     if (tableConfig == null) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index 0e311e6..f21d4cb 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -64,9 +64,11 @@ public class PinotTableIdealStateBuilder {
   public static IdealState buildEmptyIdealStateFor(String tableName, int numCopies, boolean enableBatchMessageMode) {
     final CustomModeISBuilder customModeIdealStateBuilder = new CustomModeISBuilder(tableName);
     final int replicas = numCopies;
-    customModeIdealStateBuilder
-        .setStateModel(PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL)
-        .setNumPartitions(0).setNumReplica(replicas).setMaxPartitionsPerNode(1);
+    customModeIdealStateBuilder.setStateModel(
+        PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL)
+        .setNumPartitions(0)
+        .setNumReplica(replicas)
+        .setMaxPartitionsPerNode(1);
     final IdealState idealState = customModeIdealStateBuilder.build();
     idealState.setInstanceGroupTag(tableName);
     idealState.setBatchMessageMode(enableBatchMessageMode);
@@ -88,7 +90,8 @@ public class PinotTableIdealStateBuilder {
         new CustomModeISBuilder(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
     customModeIdealStateBuilder.setStateModel(
         PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL)
-        .setMaxPartitionsPerNode(Integer.MAX_VALUE).setNumReplica(Integer.MAX_VALUE)
+        .setMaxPartitionsPerNode(Integer.MAX_VALUE)
+        .setNumReplica(Integer.MAX_VALUE)
         .setNumPartitions(Integer.MAX_VALUE);
     final IdealState idealState = customModeIdealStateBuilder.build();
     idealState.setBatchMessageMode(enableBatchMessageMode);
@@ -118,8 +121,9 @@ public class PinotTableIdealStateBuilder {
     return idealState;
   }
 
-  public static void buildLowLevelRealtimeIdealStateFor(String realtimeTableName, TableConfig realtimeTableConfig,
-      IdealState idealState, boolean enableBatchMessageMode) {
+  public static void buildLowLevelRealtimeIdealStateFor(PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager,
+      String realtimeTableName, TableConfig realtimeTableConfig, IdealState idealState,
+      boolean enableBatchMessageMode) {
 
     // Validate replicasPerPartition here.
     final String replicasPerPartitionStr = realtimeTableConfig.getValidationConfig().getReplicasPerPartition();
@@ -136,9 +140,8 @@ public class PinotTableIdealStateBuilder {
     if (idealState == null) {
       idealState = buildEmptyRealtimeIdealStateFor(realtimeTableName, nReplicas, enableBatchMessageMode);
     }
-    final PinotLLCRealtimeSegmentManager segmentManager = PinotLLCRealtimeSegmentManager.getInstance();
     try {
-      segmentManager.setupNewTable(realtimeTableConfig, idealState);
+      pinotLLCRealtimeSegmentManager.setupNewTable(realtimeTableConfig, idealState);
     } catch (InvalidConfigException e) {
       throw new IllegalStateException("Caught exception when creating table " + realtimeTableName, e);
     }
@@ -159,9 +162,11 @@ public class PinotTableIdealStateBuilder {
   public static IdealState buildEmptyRealtimeIdealStateFor(String realtimeTableName, int replicaCount,
       boolean enableBatchMessageMode) {
     final CustomModeISBuilder customModeIdealStateBuilder = new CustomModeISBuilder(realtimeTableName);
-    customModeIdealStateBuilder
-        .setStateModel(PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL)
-        .setNumPartitions(0).setNumReplica(replicaCount).setMaxPartitionsPerNode(1);
+    customModeIdealStateBuilder.setStateModel(
+        PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL)
+        .setNumPartitions(0)
+        .setNumReplica(replicaCount)
+        .setMaxPartitionsPerNode(1);
     final IdealState idealState = customModeIdealStateBuilder.build();
     idealState.setInstanceGroupTag(realtimeTableName);
     idealState.setBatchMessageMode(enableBatchMessageMode);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 324b9ae..d21861c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -111,9 +111,6 @@ public class PinotLLCRealtimeSegmentManager {
    */
   private static int MAX_SEGMENT_COMPLETION_TIME_MILLIS = 300_000; // 5 MINUTES
 
-  // TODO: fix the misuse of singleton.
-  private static PinotLLCRealtimeSegmentManager INSTANCE = null;
-
   private final HelixAdmin _helixAdmin;
   private final HelixManager _helixManager;
   private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
@@ -139,25 +136,13 @@ public class PinotLLCRealtimeSegmentManager {
     return _controllerConf.generateVipUrl();
   }
 
-  public static synchronized void create(PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
+  public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
       ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
-    create(helixResourceManager.getHelixAdmin(), helixResourceManager.getHelixClusterName(),
+    this(helixResourceManager.getHelixAdmin(), helixResourceManager.getHelixClusterName(),
         helixResourceManager.getHelixZkManager(), helixResourceManager.getPropertyStore(), helixResourceManager,
         controllerConf, controllerMetrics, controllerLeadershipManager);
   }
 
-  private static synchronized void create(HelixAdmin helixAdmin, String clusterName, HelixManager helixManager,
-      ZkHelixPropertyStore propertyStore, PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
-      ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
-    if (INSTANCE != null) {
-      throw new RuntimeException("Instance already created");
-    }
-    INSTANCE =
-        new PinotLLCRealtimeSegmentManager(helixAdmin, clusterName, helixManager, propertyStore, helixResourceManager,
-            controllerConf, controllerMetrics, controllerLeadershipManager);
-    SegmentCompletionManager.create(helixManager, INSTANCE, controllerConf, controllerMetrics, controllerLeadershipManager);
-  }
-
   public void stop() {
     _isStopping = true;
     LOGGER
@@ -181,8 +166,6 @@ public class PinotLLCRealtimeSegmentManager {
       }
     }
     LOGGER.info("Wait completed: Number of completing segments = {}", _numCompletingSegments.get());
-    INSTANCE = null;
-    SegmentCompletionManager.stop();
   }
 
   protected PinotLLCRealtimeSegmentManager(HelixAdmin helixAdmin, String clusterName, HelixManager helixManager,
@@ -206,13 +189,6 @@ public class PinotLLCRealtimeSegmentManager {
     _controllerLeadershipManager = controllerLeadershipManager;
   }
 
-  public static PinotLLCRealtimeSegmentManager getInstance() {
-    if (INSTANCE == null) {
-      throw new RuntimeException("Not yet created");
-    }
-    return INSTANCE;
-  }
-
   protected boolean isLeader() {
     return _controllerLeadershipManager.isLeader();
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index 618c4a6..5ae4b25 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -64,9 +64,6 @@ public class SegmentCompletionManager {
     ABORTED,      // state machine is aborted. we will start a fresh one when the next segmentConsumed comes in.
   }
 
-  // TODO: fix the misuse of singleton.
-  private static SegmentCompletionManager _instance = null;
-
   private final HelixManager _helixManager;
   // A map that holds the FSM for each segment.
   private final Map<String, SegmentCompletionFSM> _fsmMap = new ConcurrentHashMap<>();
@@ -84,12 +81,15 @@ public class SegmentCompletionManager {
 
   // TODO keep some history of past committed segments so that we can avoid looking up PROPERTYSTORE if some server comes in late.
 
-  protected SegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager,
-      ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
+  public SegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager,
+      ControllerMetrics controllerMetrics, int segmentCommitTimeoutSeconds,
+      ControllerLeadershipManager controllerLeadershipManager) {
     _helixManager = helixManager;
     _segmentManager = segmentManager;
     _controllerMetrics = controllerMetrics;
     _controllerLeadershipManager = controllerLeadershipManager;
+    SegmentCompletionProtocol.setMaxSegmentCommitTimeMs(
+        TimeUnit.MILLISECONDS.convert(segmentCommitTimeoutSeconds, TimeUnit.SECONDS));
   }
 
   public boolean isSplitCommitEnabled() {
@@ -100,25 +100,6 @@ public class SegmentCompletionManager {
     return _segmentManager.getControllerVipUrl();
   }
 
-  public static SegmentCompletionManager create(HelixManager helixManager,
-      PinotLLCRealtimeSegmentManager segmentManager, ControllerConf controllerConf,
-      ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
-    if (_instance != null) {
-      throw new RuntimeException("Cannot create multiple instances");
-    }
-    _instance = new SegmentCompletionManager(helixManager, segmentManager, controllerMetrics, controllerLeadershipManager);
-    SegmentCompletionProtocol.setMaxSegmentCommitTimeMs(
-        TimeUnit.MILLISECONDS.convert(controllerConf.getSegmentCommitTimeoutSeconds(), TimeUnit.SECONDS));
-    return _instance;
-  }
-
-  public static SegmentCompletionManager getInstance() {
-    if (_instance == null) {
-      throw new RuntimeException("Not yet created");
-    }
-    return _instance;
-  }
-
   protected long getCurrentTimeMs() {
     return System.currentTimeMillis();
   }
@@ -140,11 +121,11 @@ public class SegmentCompletionManager {
           // Also good for synchronization, because it is possible that multiple threads take this path, and we don't want
           // multiple instances of the FSM to be created for the same commit sequence at the same time.
           final long endOffset = segmentMetadata.getEndOffset();
-          fsm = SegmentCompletionFSM
-              .fsmInCommit(_segmentManager, this, segmentName, segmentMetadata.getNumReplicas(), endOffset);
+          fsm = SegmentCompletionFSM.fsmInCommit(_segmentManager, this, segmentName, segmentMetadata.getNumReplicas(),
+              endOffset);
         } else if (msgType.equals(SegmentCompletionProtocol.MSG_TYPE_STOPPED_CONSUMING)) {
-          fsm = SegmentCompletionFSM
-              .fsmStoppedConsuming(_segmentManager, this, segmentName, segmentMetadata.getNumReplicas());
+          fsm = SegmentCompletionFSM.fsmStoppedConsuming(_segmentManager, this, segmentName,
+              segmentMetadata.getNumReplicas());
         } else {
           // Segment is in the process of completing, and this is the first one to respond. Create fsm
           fsm = SegmentCompletionFSM.fsmInHolding(_segmentManager, this, segmentName, segmentMetadata.getNumReplicas());
@@ -627,7 +608,8 @@ public class SegmentCompletionManager {
           LOGGER.error("Segment upload failed");
           return abortAndReturnFailed();
         }
-        SegmentCompletionProtocol.Response response = commitSegment(reqParams, isSplitCommit, committingSegmentDescriptor);
+        SegmentCompletionProtocol.Response response =
+            commitSegment(reqParams, isSplitCommit, committingSegmentDescriptor);
         if (!response.equals(SegmentCompletionProtocol.RESP_COMMIT_SUCCESS)) {
           return abortAndReturnFailed();
         } else {
@@ -644,10 +626,11 @@ public class SegmentCompletionManager {
 
     private SegmentCompletionProtocol.Response commit(String instanceId, long offset) {
       long allowedBuildTimeSec = (_maxTimeAllowedToCommitMs - _startTimeMs) / 1000;
-      LOGGER
-          .info("{}:COMMIT for instance={} offset={} buldTimeSec={}", _state, instanceId, offset, allowedBuildTimeSec);
+      LOGGER.info("{}:COMMIT for instance={} offset={} buldTimeSec={}", _state, instanceId, offset,
+          allowedBuildTimeSec);
       SegmentCompletionProtocol.Response.Params params =
-          new SegmentCompletionProtocol.Response.Params().withOffset(offset).withBuildTimeSeconds(allowedBuildTimeSec)
+          new SegmentCompletionProtocol.Response.Params().withOffset(offset)
+              .withBuildTimeSeconds(allowedBuildTimeSec)
               .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT)
               .withSplitCommit(_isSplitCommitEnabled);
       if (_isSplitCommitEnabled) {
@@ -676,21 +659,21 @@ public class SegmentCompletionManager {
 
     private SegmentCompletionProtocol.Response hold(String instanceId, long offset) {
       LOGGER.info("{}:HOLD for instance={} offset={}", _state, instanceId, offset);
-      return new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params()
-          .withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD).withOffset(offset));
+      return new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withStatus(
+          SegmentCompletionProtocol.ControllerResponseStatus.HOLD).withOffset(offset));
     }
 
     private SegmentCompletionProtocol.Response abortAndReturnHold(long now, String instanceId, long offset) {
       _state = State.ABORTED;
-      _segmentCompletionManager._controllerMetrics
-          .addMeteredTableValue(_segmentName.getTableName(), ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
+      _segmentCompletionManager._controllerMetrics.addMeteredTableValue(_segmentName.getTableName(),
+          ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
       return hold(instanceId, offset);
     }
 
     private SegmentCompletionProtocol.Response abortAndReturnFailed() {
       _state = State.ABORTED;
-      _segmentCompletionManager._controllerMetrics
-          .addMeteredTableValue(_segmentName.getTableName(), ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
+      _segmentCompletionManager._controllerMetrics.addMeteredTableValue(_segmentName.getTableName(),
+          ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
       return SegmentCompletionProtocol.RESP_FAILED;
     }
 
@@ -962,9 +945,8 @@ public class SegmentCompletionManager {
 
     private SegmentCompletionProtocol.Response processStoppedConsuming(String instanceId, long offset, String reason,
         boolean createNew) {
-      LOGGER
-          .info("Instance {} stopped consuming segment {} at offset {}, state {}, createNew: {}, reason:{}", instanceId,
-              _segmentName, offset, _state, createNew, reason);
+      LOGGER.info("Instance {} stopped consuming segment {} at offset {}, state {}, createNew: {}, reason:{}",
+          instanceId, _segmentName, offset, _state, createNew, reason);
       _segmentManager.segmentStoppedConsuming(_segmentName, instanceId);
       return SegmentCompletionProtocol.RESP_PROCESSED;
     }
@@ -1008,8 +990,7 @@ public class SegmentCompletionManager {
     }
 
     private SegmentCompletionProtocol.Response commitSegment(SegmentCompletionProtocol.Request.Params reqParams,
-                                                             boolean isSplitCommit,
-                                                             CommittingSegmentDescriptor committingSegmentDescriptor) {
+        boolean isSplitCommit, CommittingSegmentDescriptor committingSegmentDescriptor) {
       boolean success;
       String instanceId = reqParams.getInstanceId();
       long offset = reqParams.getOffset();
@@ -1118,10 +1099,6 @@ public class SegmentCompletionManager {
     }
   }
 
-  public static void stop() {
-    _instance = null;
-  }
-
   @VisibleForTesting
   protected boolean isLeader() {
     return _controllerLeadershipManager.isLeader();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
index 9bf70bd..b716e90 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
@@ -103,6 +103,15 @@ public class PinotControllerModeTest extends ControllerTest {
         "Failed to start " + config.getControllerMode() + " controller in " + TIMEOUT_IN_MS + "ms.");
     Assert.assertEquals(_controllerStarter.getControllerMode(), ControllerConf.ControllerMode.PINOT_ONLY);
 
+    // Start the second Pinot only controller
+    config.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++));
+    ControllerStarter controllerStarter = new ControllerStarter(config);
+    controllerStarter.start();
+
+    Thread.sleep(100_000L);
+
+    controllerStarter.stop();
+
     stopController();
     _controllerStarter = null;
     helixControllerStarter.stop();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index 0dfe6b4..d004326 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -58,13 +58,11 @@ public class SegmentCompletionTest {
   private final long s3Offset = 30L;
 
   @BeforeMethod
-  public void testCaseSetup()
-      throws Exception {
+  public void testCaseSetup() throws Exception {
     testCaseSetup(true, true);
   }
 
-  public void testCaseSetup(boolean isLeader, boolean isConnected)
-      throws Exception {
+  public void testCaseSetup(boolean isLeader, boolean isConnected) throws Exception {
     segmentManager = new MockPinotLLCRealtimeSegmentManager(isLeader, isConnected);
     ControllerLeadershipManager controllerLeadershipManager = segmentManager.getControllerLeadershipManager();
     final int partitionId = 23;
@@ -78,8 +76,7 @@ public class SegmentCompletionTest {
     metadata.setNumReplicas(3);
     segmentManager._segmentMetadata = metadata;
 
-    segmentCompletionMgr =
-        new MockSegmentCompletionManager(segmentManager, isLeader, isConnected);
+    segmentCompletionMgr = new MockSegmentCompletionManager(segmentManager, isLeader, isConnected);
     segmentManager._segmentCompletionMgr = segmentCompletionMgr;
 
     Field fsmMapField = SegmentCompletionManager.class.getDeclaredField("_fsmMap");
@@ -93,8 +90,7 @@ public class SegmentCompletionTest {
 
   // Simulate a new controller taking over with an empty completion manager object,
   // but segment metadata is fine in zk
-  private void replaceSegmentCompletionManager()
-      throws Exception {
+  private void replaceSegmentCompletionManager() throws Exception {
     long oldSecs = segmentCompletionMgr._secconds;
     segmentCompletionMgr = new MockSegmentCompletionManager(segmentManager, true, true);
     segmentCompletionMgr._secconds = oldSecs;
@@ -104,8 +100,7 @@ public class SegmentCompletionTest {
   }
 
   @Test
-  public void testStoppedConsumeDuringCompletion()
-      throws Exception {
+  public void testStoppedConsumeDuringCompletion() throws Exception {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     final String reason = "IAmLazy";
@@ -154,8 +149,8 @@ public class SegmentCompletionTest {
 
     segmentCompletionMgr._secconds += 5;
     params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
-    response = segmentCompletionMgr
-        .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+    response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
+        CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
 
     // Now the FSM should have disappeared from the map
@@ -172,8 +167,7 @@ public class SegmentCompletionTest {
   }
 
   @Test
-  public void testStoppedConsumeBeforeHold()
-      throws Exception {
+  public void testStoppedConsumeBeforeHold() throws Exception {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     final String reason = "IAmLazy";
@@ -224,8 +218,8 @@ public class SegmentCompletionTest {
 
     segmentCompletionMgr._secconds += 5;
     params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
-    response = segmentCompletionMgr
-        .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+    response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
+        CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
 
     // Now the FSM should have disappeared from the map
@@ -243,13 +237,14 @@ public class SegmentCompletionTest {
 
   // s2 sends stoppedConsuming message, but then may have gotten restarted, so eventually we complete the segment.
   @Test
-  public void testHappyPathAfterStoppedConsuming()
-      throws Exception {
+  public void testHappyPathAfterStoppedConsuming() throws Exception {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     segmentCompletionMgr._secconds = 5;
 
-    params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+    params = new Request.Params().withInstanceId(s2)
+        .withOffset(s2Offset)
+        .withSegmentName(segmentNameStr)
         .withReason("some reason");
     response = segmentCompletionMgr.segmentStoppedConsuming(params);
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED);
@@ -262,21 +257,18 @@ public class SegmentCompletionTest {
   }
 
   @Test
-  public void testHappyPath()
-      throws Exception {
+  public void testHappyPath() throws Exception {
     testHappyPath(5L);
   }
 
   // Tests happy path with split commit protocol
   @Test
-  public void testHappyPathSplitCommit()
-      throws Exception {
+  public void testHappyPathSplitCommit() throws Exception {
     testHappyPathSplitCommit(5L);
   }
 
   @Test
-  public void testExceptionInConsumedMessage()
-      throws Exception {
+  public void testExceptionInConsumedMessage() throws Exception {
     segmentManager._segmentMetadata = null;
 
     SegmentCompletionProtocol.Response response;
@@ -289,8 +281,7 @@ public class SegmentCompletionTest {
 
   // When commit segment file fails, makes sure the fsm aborts and that a segment can successfully commit afterwards.
   @Test
-  public void testCommitSegmentFileFail()
-      throws Exception {
+  public void testCommitSegmentFileFail() throws Exception {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     // s1 sends offset of 20, gets HOLD at t = 5s;
@@ -333,10 +324,12 @@ public class SegmentCompletionTest {
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
     // s2's file does not successfully commit
     segmentCompletionMgr._secconds += 5;
-    params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+    params = new Request.Params().withInstanceId(s2)
+        .withOffset(s2Offset)
+        .withSegmentName(segmentNameStr)
         .withSegmentLocation("doNotCommitMe");
-    response = segmentCompletionMgr
-        .segmentCommitEnd(params, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+    response = segmentCompletionMgr.segmentCommitEnd(params, true, true,
+        CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.FAILED);
 
     // Now the FSM should have aborted
@@ -367,17 +360,18 @@ public class SegmentCompletionTest {
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
     // s2's file successfully commits
     segmentCompletionMgr._secconds += 5;
-    params = new Request.Params().withInstanceId(s3).withOffset(s2Offset).withSegmentName(segmentNameStr)
+    params = new Request.Params().withInstanceId(s3)
+        .withOffset(s2Offset)
+        .withSegmentName(segmentNameStr)
         .withSegmentLocation("location");
-    response = segmentCompletionMgr
-        .segmentCommitEnd(params, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+    response = segmentCompletionMgr.segmentCommitEnd(params, true, true,
+        CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), ControllerResponseStatus.COMMIT_SUCCESS);
     // And the FSM should be removed.
     Assert.assertFalse(fsmMap.containsKey(segmentNameStr));
   }
 
-  public void testHappyPathSplitCommit(long startTime)
-      throws Exception {
+  public void testHappyPathSplitCommit(long startTime) throws Exception {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     // s1 sends offset of 20, gets HOLD at t = 5s;
@@ -420,10 +414,12 @@ public class SegmentCompletionTest {
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
 
     segmentCompletionMgr._secconds += 5;
-    params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+    params = new Request.Params().withInstanceId(s2)
+        .withOffset(s2Offset)
+        .withSegmentName(segmentNameStr)
         .withSegmentLocation("location");
-    response = segmentCompletionMgr
-        .segmentCommitEnd(params, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+    response = segmentCompletionMgr.segmentCommitEnd(params, true, true,
+        CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
 
     // Now the FSM should have disappeared from the map
@@ -441,8 +437,7 @@ public class SegmentCompletionTest {
 
   // Tests that we abort when the server instance comes back with a different offset than it is told to commit with
   @Test
-  public void testCommitDifferentOffsetSplitCommit()
-      throws Exception {
+  public void testCommitDifferentOffsetSplitCommit() throws Exception {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     // s1 sends offset of 20, gets HOLD at t = 5s;
@@ -470,10 +465,12 @@ public class SegmentCompletionTest {
 
     // s3 comes back to try to commit with a different offset
     segmentCompletionMgr._secconds += 5;
-    params = new Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr)
+    params = new Request.Params().withInstanceId(s3)
+        .withOffset(s3Offset)
+        .withSegmentName(segmentNameStr)
         .withSegmentLocation("location");
-    response = segmentCompletionMgr
-        .segmentCommitEnd(params, true, true, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+    response = segmentCompletionMgr.segmentCommitEnd(params, true, true,
+        CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.FAILED);
 
     // Now the FSM should have disappeared from the map
@@ -489,8 +486,7 @@ public class SegmentCompletionTest {
     Assert.assertTrue(fsmMap.containsKey(segmentNameStr));
   }
 
-  public void testHappyPath(long startTime)
-      throws Exception {
+  public void testHappyPath(long startTime) throws Exception {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     // s1 sends offset of 20, gets HOLD at t = 5s;
@@ -532,8 +528,8 @@ public class SegmentCompletionTest {
 
     segmentCompletionMgr._secconds += 5;
     params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr);
-    response = segmentCompletionMgr
-        .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+    response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
+        CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
 
     // Now the FSM should have disappeared from the map
@@ -550,50 +546,57 @@ public class SegmentCompletionTest {
   }
 
   @Test
-  public void testControllerNotConnected()
-      throws Exception {
+  public void testControllerNotConnected() throws Exception {
     testCaseSetup(true, false); // Leader but not connected
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     // s1 sends offset of 20, gets HOLD at t = 5s;
     segmentCompletionMgr._secconds = 5L;
-    params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr)
+    params = new Request.Params().withInstanceId(s1)
+        .withOffset(s1Offset)
+        .withSegmentName(segmentNameStr)
         .withReason("rowLimit");
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), ControllerResponseStatus.NOT_LEADER);
   }
 
   @Test
-  public void testWinnerOnTimeLimit()
-      throws Exception {
+  public void testWinnerOnTimeLimit() throws Exception {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     segmentCompletionMgr._secconds = 10L;
-    params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr)
+    params = new Request.Params().withInstanceId(s1)
+        .withOffset(s1Offset)
+        .withSegmentName(segmentNameStr)
         .withReason(SegmentCompletionProtocol.REASON_TIME_LIMIT);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.HOLD);
   }
 
   @Test
-  public void testWinnerOnRowLimit()
-      throws Exception {
+  public void testWinnerOnRowLimit() throws Exception {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     segmentCompletionMgr._secconds = 10L;
-    params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr)
+    params = new Request.Params().withInstanceId(s1)
+        .withOffset(s1Offset)
+        .withSegmentName(segmentNameStr)
         .withReason(SegmentCompletionProtocol.REASON_ROW_LIMIT);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), ControllerResponseStatus.COMMIT);
     // S2 comes with the same offset as S1
     segmentCompletionMgr._secconds += 1;
-    params = new Request.Params().withInstanceId(s2).withOffset(s1Offset).withSegmentName(segmentNameStr)
+    params = new Request.Params().withInstanceId(s2)
+        .withOffset(s1Offset)
+        .withSegmentName(segmentNameStr)
         .withReason(SegmentCompletionProtocol.REASON_ROW_LIMIT);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), ControllerResponseStatus.HOLD);
     segmentCompletionMgr._secconds += 1;
     // S3 comes with a different offset and without row limit. we ask it to hold even though it is higher.
-    params = new Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr)
+    params = new Request.Params().withInstanceId(s3)
+        .withOffset(s3Offset)
+        .withSegmentName(segmentNameStr)
         .withReason(SegmentCompletionProtocol.REASON_TIME_LIMIT);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), ControllerResponseStatus.HOLD);
@@ -605,16 +608,20 @@ public class SegmentCompletionTest {
 
     segmentCompletionMgr._secconds += 5;
     params = new Request.Params().withInstanceId(s1).withOffset(s1Offset).withSegmentName(segmentNameStr);
-    response = segmentCompletionMgr
-        .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+    response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
+        CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
     // We ask S2 to keep the segment
-    params = new Request.Params().withInstanceId(s2).withOffset(s1Offset).withSegmentName(segmentNameStr)
+    params = new Request.Params().withInstanceId(s2)
+        .withOffset(s1Offset)
+        .withSegmentName(segmentNameStr)
         .withReason(SegmentCompletionProtocol.REASON_ROW_LIMIT);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), ControllerResponseStatus.KEEP);
     // And we ask S3 to discard because it was ahead.
-    params = new Request.Params().withInstanceId(s3).withOffset(s3Offset).withSegmentName(segmentNameStr)
+    params = new Request.Params().withInstanceId(s3)
+        .withOffset(s3Offset)
+        .withSegmentName(segmentNameStr)
         .withReason(SegmentCompletionProtocol.REASON_TIME_LIMIT);
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), ControllerResponseStatus.DISCARD);
@@ -622,19 +629,16 @@ public class SegmentCompletionTest {
 
   // Tests that when server is delayed(Stalls for a hour), when server comes back, we commit successfully.
   @Test
-  public void testDelayedServerSplitCommit()
-      throws Exception {
+  public void testDelayedServerSplitCommit() throws Exception {
     testDelayedServer(true);
   }
 
   @Test
-  public void testDelayedServer()
-      throws Exception {
+  public void testDelayedServer() throws Exception {
     testDelayedServer(false);
   }
 
-  public void testDelayedServer(boolean isSplitCommit)
-      throws Exception {
+  public void testDelayedServer(boolean isSplitCommit) throws Exception {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     // s1 sends offset of 20, gets HOLD at t = 5s;
@@ -670,7 +674,9 @@ public class SegmentCompletionTest {
     response = segmentCompletionMgr.segmentCommitStart(params);
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE);
     segmentCompletionMgr._secconds += 5;
-    params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+    params = new Request.Params().withInstanceId(s2)
+        .withOffset(s2Offset)
+        .withSegmentName(segmentNameStr)
         .withSegmentLocation("location");
     response = segmentCompletionMgr.segmentCommitEnd(params, true, isSplitCommit,
         CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
@@ -689,8 +695,7 @@ public class SegmentCompletionTest {
 
   // We test the case where all servers go silent after controller asks one of them commit
   @Test
-  public void testDeadServers()
-      throws Exception {
+  public void testDeadServers() throws Exception {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     // s1 sends offset of 20, gets HOLD at t = 5s;
@@ -737,8 +742,7 @@ public class SegmentCompletionTest {
 
   // We test the case when the committer is asked to commit, but they never come back.
   @Test
-  public void testCommitterFailure()
-      throws Exception {
+  public void testCommitterFailure() throws Exception {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     // s1 sends offset of 20, gets HOLD at t = 5s;
@@ -807,8 +811,7 @@ public class SegmentCompletionTest {
   }
 
   @Test
-  public void testHappyPathSlowCommit()
-      throws Exception {
+  public void testHappyPathSlowCommit() throws Exception {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     // s1 sends offset of 20, gets HOLD at t = 1509242466s;
@@ -845,7 +848,9 @@ public class SegmentCompletionTest {
 
     // Fast forward to one second before commit time, and send a lease renewal request for 20s
     segmentCompletionMgr._secconds = startTime + commitTimeSec - 1;
-    params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+    params = new Request.Params().withInstanceId(s2)
+        .withOffset(s2Offset)
+        .withSegmentName(segmentNameStr)
         .withExtraTimeSec(20);
     response = segmentCompletionMgr.extendBuildTime(params);
     Assert.assertEquals(response.getStatus(), ControllerResponseStatus.PROCESSED);
@@ -853,7 +858,9 @@ public class SegmentCompletionTest {
 
     // Another lease extension in 19s.
     segmentCompletionMgr._secconds += 19;
-    params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+    params = new Request.Params().withInstanceId(s2)
+        .withOffset(s2Offset)
+        .withSegmentName(segmentNameStr)
         .withExtraTimeSec(20);
     response = segmentCompletionMgr.extendBuildTime(params);
     Assert.assertEquals(response.getStatus(), ControllerResponseStatus.PROCESSED);
@@ -867,16 +874,15 @@ public class SegmentCompletionTest {
     long commitTimeMs = (segmentCompletionMgr._secconds - startTime) * 1000;
     Assert.assertEquals(commitTimeMap.get(tableName).longValue(), commitTimeMs);
     segmentCompletionMgr._secconds += 55;
-    response = segmentCompletionMgr
-        .segmentCommitEnd(params, true, false, CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
+    response = segmentCompletionMgr.segmentCommitEnd(params, true, false,
+        CommittingSegmentDescriptor.fromSegmentCompletionReqParams(params));
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
     // now FSM should be out of the map.
     Assert.assertFalse((fsmMap.containsKey(segmentNameStr)));
   }
 
   @Test
-  public void testFailedSlowCommit()
-      throws Exception {
+  public void testFailedSlowCommit() throws Exception {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     final String tableName = new LLCSegmentName(segmentNameStr).getTableName();
@@ -912,7 +918,9 @@ public class SegmentCompletionTest {
 
     // Fast forward to one second before commit time, and send a lease renewal request for 20s
     segmentCompletionMgr._secconds = startTime + commitTimeSec - 1;
-    params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+    params = new Request.Params().withInstanceId(s2)
+        .withOffset(s2Offset)
+        .withSegmentName(segmentNameStr)
         .withExtraTimeSec(20);
     response = segmentCompletionMgr.extendBuildTime(params);
     Assert.assertEquals(response.getStatus(), ControllerResponseStatus.PROCESSED);
@@ -929,8 +937,7 @@ public class SegmentCompletionTest {
   }
 
   @Test
-  public void testLeaseTooLong()
-      throws Exception {
+  public void testLeaseTooLong() throws Exception {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     // s1 sends offset of 20, gets HOLD at t = 5s;
@@ -965,7 +972,9 @@ public class SegmentCompletionTest {
 
     // Fast forward to one second before commit time, and send a lease renewal request for 20s
     segmentCompletionMgr._secconds = startTime + commitTimeSec - 1;
-    params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+    params = new Request.Params().withInstanceId(s2)
+        .withOffset(s2Offset)
+        .withSegmentName(segmentNameStr)
         .withExtraTimeSec(20);
     response = segmentCompletionMgr.extendBuildTime(params);
     Assert.assertEquals(response.getStatus(), ControllerResponseStatus.PROCESSED);
@@ -973,9 +982,11 @@ public class SegmentCompletionTest {
 
     final int leaseTimeSec = 20;
     // Lease will not be granted if the time taken so far plus lease time exceeds the max allowabale.
-    while (segmentCompletionMgr._secconds + leaseTimeSec <= startTime + SegmentCompletionManager
-        .getMaxCommitTimeForAllSegmentsSeconds()) {
-      params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+    while (segmentCompletionMgr._secconds + leaseTimeSec
+        <= startTime + SegmentCompletionManager.getMaxCommitTimeForAllSegmentsSeconds()) {
+      params = new Request.Params().withInstanceId(s2)
+          .withOffset(s2Offset)
+          .withSegmentName(segmentNameStr)
           .withExtraTimeSec(leaseTimeSec);
       response = segmentCompletionMgr.extendBuildTime(params);
       Assert.assertEquals(response.getStatus(), ControllerResponseStatus.PROCESSED);
@@ -984,7 +995,9 @@ public class SegmentCompletionTest {
     }
 
     // Now the lease request should fail.
-    params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+    params = new Request.Params().withInstanceId(s2)
+        .withOffset(s2Offset)
+        .withSegmentName(segmentNameStr)
         .withExtraTimeSec(leaseTimeSec);
     response = segmentCompletionMgr.extendBuildTime(params);
     Assert.assertEquals(response.getStatus(), ControllerResponseStatus.FAILED);
@@ -992,8 +1005,7 @@ public class SegmentCompletionTest {
   }
 
   @Test
-  public void testControllerFailureDuringCommit()
-      throws Exception {
+  public void testControllerFailureDuringCommit() throws Exception {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     // s1 sends offset of 20, gets HOLD at t = 5s;
@@ -1052,8 +1064,7 @@ public class SegmentCompletionTest {
   // Tests that when controller fails, after controller failure, commit doesn't continue. Then because all server instances
   // are in holding state, we will ask one to commit.
   @Test
-  public void testControllerFailureDuringSplitCommit()
-      throws Exception {
+  public void testControllerFailureDuringSplitCommit() throws Exception {
     SegmentCompletionProtocol.Response response;
     Request.Params params;
     // s1 sends offset of 20, gets HOLD at t = 5s;
@@ -1104,15 +1115,16 @@ public class SegmentCompletionTest {
     Assert.assertTrue(response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.HOLD));
 
     // So s2 goes back into HOLDING state. s1 and s3 are already holding, so now it will get COMMIT back.
-    params = new Request.Params().withInstanceId(s2).withOffset(s2Offset).withSegmentName(segmentNameStr)
+    params = new Request.Params().withInstanceId(s2)
+        .withOffset(s2Offset)
+        .withSegmentName(segmentNameStr)
         .withSegmentLocation("location");
     response = segmentCompletionMgr.segmentConsumed(params);
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.COMMIT);
   }
 
   @Test
-  public void testNotLeader()
-      throws Exception {
+  public void testNotLeader() throws Exception {
     testCaseSetup(false, true);
     SegmentCompletionProtocol.Response response;
     SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
@@ -1157,8 +1169,8 @@ public class SegmentCompletionTest {
     public boolean commitSegmentMetadata(String rawTableName, CommittingSegmentDescriptor committingSegmentDescriptor) {
       _segmentMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
       _segmentMetadata.setEndOffset(committingSegmentDescriptor.getNextOffset());
-      _segmentMetadata.setDownloadUrl(ControllerConf
-          .constructDownloadUrl(rawTableName, committingSegmentDescriptor.getSegmentName(),
+      _segmentMetadata.setDownloadUrl(
+          ControllerConf.constructDownloadUrl(rawTableName, committingSegmentDescriptor.getSegmentName(),
               CONTROLLER_CONF.generateVipUrl()));
       _segmentMetadata.setEndTime(_segmentCompletionMgr.getCurrentTimeMs());
       return true;
@@ -1194,15 +1206,15 @@ public class SegmentCompletionTest {
       this(createMockHelixManager(isLeader, isConnected), segmentManager, isLeader, isConnected);
     }
 
-    protected MockSegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager, boolean isLeader,
-        boolean isConnected) {
+    protected MockSegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager,
+        boolean isLeader, boolean isConnected) {
       this(helixManager, segmentManager, isLeader, isConnected, new ControllerLeadershipManager(helixManager));
     }
 
     protected MockSegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager,
         boolean isLeader, boolean isConnected, ControllerLeadershipManager controllerLeadershipManager) {
       super(helixManager, segmentManager, new ControllerMetrics(new MetricsRegistry()),
-          controllerLeadershipManager);
+          SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds(), controllerLeadershipManager);
       _isLeader = isLeader;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org