You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/06/14 23:47:44 UTC

[incubator-pinot] 01/01: Add logic for lead controller resource on controller side

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch add-logic-for-lead-controller-resource
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 78c134081aee26930b3de37cc86904380ad9c4e4
Author: jackjlli <jl...@linkedin.com>
AuthorDate: Fri Jun 14 16:35:14 2019 -0700

    Add logic for lead controller resource on controller side
---
 .../apache/pinot/controller/ControllerStarter.java | 44 ++++++++-----
 .../pinot/controller/LeadControllerManager.java    | 76 ++++++++++++++++++++++
 .../PinotSegmentUploadRestletResource.java         |  7 +-
 .../controller/api/upload/SegmentValidator.java    | 10 +--
 .../controller/helix/SegmentStatusChecker.java     |  8 ++-
 .../helix/core/PinotHelixResourceManager.java      | 19 +++++-
 .../helix/core/minion/PinotTaskManager.java        |  8 ++-
 .../core/periodictask/ControllerPeriodicTask.java  |  9 ++-
 .../ControllerPeriodicTaskScheduler.java           |  8 +--
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 32 ++++-----
 .../core/realtime/SegmentCompletionManager.java    | 54 ++++++++-------
 .../core/relocation/RealtimeSegmentRelocator.java  |  8 ++-
 .../helix/core/retention/RetentionManager.java     |  8 ++-
 .../core/statemodel/LeadControllerChecker.java     | 55 ++++++++++++++++
 ...rollerResourceMasterSlaveStateModelFactory.java | 64 ++++++++++++++++++
 .../BrokerResourceValidationManager.java           |  5 +-
 .../validation/OfflineSegmentIntervalChecker.java  | 13 ++--
 .../RealtimeSegmentValidationManager.java          |  8 ++-
 .../controller/validation/StorageQuotaChecker.java | 14 ++--
 .../controller/helix/SegmentStatusCheckerTest.java | 73 ++++++++++++++++++---
 .../periodictask/ControllerPeriodicTaskTest.java   |  7 +-
 .../PinotLLCRealtimeSegmentManagerTest.java        |  7 +-
 .../helix/core/realtime/SegmentCompletionTest.java | 10 ++-
 .../relocation/RealtimeSegmentRelocatorTest.java   | 14 ++--
 .../helix/core/retention/RetentionManagerTest.java | 12 +++-
 .../validation/StorageQuotaCheckerTest.java        | 27 ++++----
 26 files changed, 455 insertions(+), 145 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index c313616..97b6095 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -113,6 +113,7 @@ public class ControllerStarter {
   private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
   private SegmentCompletionManager _segmentCompletionManager;
   private ControllerLeadershipManager _controllerLeadershipManager;
+  private LeadControllerManager _leadControllerManager;
   private List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbackList;
 
   public ControllerStarter(ControllerConf conf) {
@@ -255,6 +256,10 @@ public class ControllerStarter {
     _helixResourceManager.start();
     HelixManager helixParticipantManager = _helixResourceManager.getHelixZkManager();
 
+    // Get lead controller manager from resource manager, register controller leadership manager to get helix leader.
+    _leadControllerManager = _helixResourceManager.getLeadControllerManager();
+    _leadControllerManager.registerControllerLeadershipManager(_controllerLeadershipManager);
+
     LOGGER.info("Registering controller leadership manager");
     // TODO: when Helix separation is completed, leadership only depends on the master in leadControllerResource, remove
     //       ControllerLeadershipManager and this callback.
@@ -266,15 +271,13 @@ public class ControllerStarter {
 
     // Helix resource manager must be started in order to create PinotLLCRealtimeSegmentManager
     LOGGER.info("Starting realtime segment manager");
-
     _pinotLLCRealtimeSegmentManager =
-        new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config, _controllerMetrics,
-            _controllerLeadershipManager);
+        new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config, _controllerMetrics, _leadControllerManager);
     // TODO: Need to put this inside HelixResourceManager when ControllerLeadershipManager is removed.
     _helixResourceManager.registerPinotLLCRealtimeSegmentManager(_pinotLLCRealtimeSegmentManager);
     _segmentCompletionManager =
         new SegmentCompletionManager(helixParticipantManager, _pinotLLCRealtimeSegmentManager, _controllerMetrics,
-            _controllerLeadershipManager, _config.getSegmentCommitTimeoutSeconds());
+            _leadControllerManager, _config.getSegmentCommitTimeoutSeconds());
 
     if (_config.getHLCTablesAllowed()) {
       LOGGER.info("Realtime tables with High Level consumers will be supported");
@@ -289,7 +292,9 @@ public class ControllerStarter {
     List<PeriodicTask> controllerPeriodicTasks = setupControllerPeriodicTasks();
     LOGGER.info("Init controller periodic tasks scheduler");
     _controllerPeriodicTaskScheduler = new ControllerPeriodicTaskScheduler();
-    _controllerPeriodicTaskScheduler.init(controllerPeriodicTasks, _controllerLeadershipManager);
+    _controllerPeriodicTaskScheduler.init(controllerPeriodicTasks, _leadControllerManager);
+
+    _controllerPeriodicTaskScheduler.start();
 
     LOGGER.info("Registering rebalance segments factory");
     _helixResourceManager
@@ -327,7 +332,7 @@ public class ControllerStarter {
         bind(_controllerMetrics).to(ControllerMetrics.class);
         bind(accessControlFactory).to(AccessControlFactory.class);
         bind(metadataEventNotifierFactory).to(MetadataEventNotifierFactory.class);
-        bind(_controllerLeadershipManager).to(ControllerLeadershipManager.class);
+        bind(_leadControllerManager).to(LeadControllerManager.class);
       }
     });
 
@@ -433,24 +438,29 @@ public class ControllerStarter {
   protected List<PeriodicTask> setupControllerPeriodicTasks() {
     LOGGER.info("Setting up periodic tasks");
     List<PeriodicTask> periodicTasks = new ArrayList<>();
-    _taskManager = new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, _config, _controllerMetrics);
+    _taskManager =
+        new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, _leadControllerManager, _config,
+            _controllerMetrics);
     periodicTasks.add(_taskManager);
-    _retentionManager = new RetentionManager(_helixResourceManager, _config, _controllerMetrics);
+    _retentionManager =
+        new RetentionManager(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
     periodicTasks.add(_retentionManager);
     _offlineSegmentIntervalChecker =
-        new OfflineSegmentIntervalChecker(_config, _helixResourceManager, new ValidationMetrics(_metricsRegistry),
-            _controllerMetrics);
+        new OfflineSegmentIntervalChecker(_config, _helixResourceManager, _leadControllerManager,
+            new ValidationMetrics(_metricsRegistry), _controllerMetrics);
     periodicTasks.add(_offlineSegmentIntervalChecker);
     _realtimeSegmentValidationManager =
-        new RealtimeSegmentValidationManager(_config, _helixResourceManager, _pinotLLCRealtimeSegmentManager,
-            new ValidationMetrics(_metricsRegistry), _controllerMetrics);
+        new RealtimeSegmentValidationManager(_config, _helixResourceManager, _leadControllerManager,
+            _pinotLLCRealtimeSegmentManager, new ValidationMetrics(_metricsRegistry), _controllerMetrics);
     periodicTasks.add(_realtimeSegmentValidationManager);
     _brokerResourceValidationManager =
-        new BrokerResourceValidationManager(_config, _helixResourceManager, _controllerMetrics);
+        new BrokerResourceValidationManager(_config, _helixResourceManager, _leadControllerManager, _controllerMetrics);
     periodicTasks.add(_brokerResourceValidationManager);
-    _segmentStatusChecker = new SegmentStatusChecker(_helixResourceManager, _config, _controllerMetrics);
+    _segmentStatusChecker =
+        new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
     periodicTasks.add(_segmentStatusChecker);
-    _realtimeSegmentRelocator = new RealtimeSegmentRelocator(_helixResourceManager, _config, _controllerMetrics);
+    _realtimeSegmentRelocator =
+        new RealtimeSegmentRelocator(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
     periodicTasks.add(_realtimeSegmentRelocator);
 
     return periodicTasks;
@@ -482,6 +492,10 @@ public class ControllerStarter {
       LOGGER.info("Stopping controller leadership manager");
       _controllerLeadershipManager.stop();
 
+      // Stop controller periodic task.
+      LOGGER.info("Stopping controller periodic tasks");
+      _controllerPeriodicTaskScheduler.stop();
+
       // Stop PinotLLCSegmentManager before stopping Jersey API. It is possible that stopping Jersey API
       // may interrupt the handlers waiting on an I/O.
       _pinotLLCRealtimeSegmentManager.stop();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java
new file mode 100644
index 0000000..9e4d3fb
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.config.TableNameBuilder;
+import org.apache.pinot.controller.helix.core.statemodel.LeadControllerChecker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pinot.common.utils.CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE;
+
+
+public class LeadControllerManager {
+  private static final Logger LOGGER = LoggerFactory.getLogger(LeadControllerManager.class);
+
+  private Map<String, LeadershipChangeSubscriber> _subscribers = new HashMap<>();
+
+  private final LeadControllerChecker _leadControllerChecker;
+  private ControllerLeadershipManager _controllerLeadershipManager;
+
+  public LeadControllerManager() {
+    _leadControllerChecker = new LeadControllerChecker();
+  }
+
+  public void registerControllerLeadershipManager(ControllerLeadershipManager controllerLeadershipManager) {
+    _controllerLeadershipManager = controllerLeadershipManager;
+  }
+
+  public boolean isLeaderForTable(String tableName) {
+    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+    int partitionIndex = rawTableName.hashCode() % NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE;
+    if (_leadControllerChecker.isPartitionLeader(partitionIndex)) {
+      return true;
+    } else if (_controllerLeadershipManager != null) {
+      return _controllerLeadershipManager.isLeader();
+    } else {
+      return false;
+    }
+  }
+
+  public synchronized void addPartitionLeader(String partitionName) {
+    _leadControllerChecker.addPartitionLeader(partitionName);
+  }
+
+  public synchronized void removePartitionLeader(String partitionName) {
+    _leadControllerChecker.removePartitionLeader(partitionName);
+  }
+
+  public void subscribe(String className, LeadershipChangeSubscriber subscriber) {
+    LOGGER.info("{} subscribing to leadership changes", className);
+    _subscribers.put(className, subscriber);
+//      subscriber.onBecomingLeader();
+  }
+
+  public void onLeadControllerChange() {
+
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
index 5259d9f..bd4b8a0 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
@@ -57,7 +57,6 @@ import javax.ws.rs.core.Response;
 import org.apache.commons.httpclient.HttpConnectionManager;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
-import org.apache.helix.ZNRecord;
 import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.metrics.ControllerMeter;
@@ -70,7 +69,7 @@ import org.apache.pinot.common.utils.JsonUtils;
 import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.api.access.AccessControl;
 import org.apache.pinot.controller.api.access.AccessControlFactory;
 import org.apache.pinot.controller.api.upload.SegmentValidator;
@@ -117,7 +116,7 @@ public class PinotSegmentUploadRestletResource {
   AccessControlFactory _accessControlFactory;
 
   @Inject
-  ControllerLeadershipManager _controllerLeadershipManager;
+  LeadControllerManager _leadControllerManager;
 
   @GET
   @Produces(MediaType.APPLICATION_JSON)
@@ -325,7 +324,7 @@ public class PinotSegmentUploadRestletResource {
       // Validate segment
       SegmentValidatorResponse segmentValidatorResponse =
           new SegmentValidator(_pinotHelixResourceManager, _controllerConf, _executor, _connectionManager,
-              _controllerMetrics, _controllerLeadershipManager)
+              _controllerMetrics, _leadControllerManager)
               .validateSegment(rawTableName, segmentMetadata, tempSegmentDir);
 
       // Zk operations
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
index 39a9657..42c7cf8 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
@@ -34,7 +34,7 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.segment.SegmentMetadata;
 import org.apache.pinot.common.utils.time.TimeUtils;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.api.resources.ControllerApplicationException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.util.TableSizeReader;
@@ -55,17 +55,17 @@ public class SegmentValidator {
   private final Executor _executor;
   private final HttpConnectionManager _connectionManager;
   private final ControllerMetrics _controllerMetrics;
-  private final ControllerLeadershipManager _controllerLeadershipManager;
+  private final LeadControllerManager _leadControllerManager;
 
   public SegmentValidator(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf controllerConf,
       Executor executor, HttpConnectionManager connectionManager, ControllerMetrics controllerMetrics,
-      ControllerLeadershipManager controllerLeadershipManager) {
+      LeadControllerManager leadControllerManager) {
     _pinotHelixResourceManager = pinotHelixResourceManager;
     _controllerConf = controllerConf;
     _executor = executor;
     _connectionManager = connectionManager;
     _controllerMetrics = controllerMetrics;
-    _controllerLeadershipManager = controllerLeadershipManager;
+    _leadControllerManager = leadControllerManager;
   }
 
   public SegmentValidatorResponse validateSegment(String rawTableName, SegmentMetadata segmentMetadata,
@@ -135,7 +135,7 @@ public class SegmentValidator {
         new TableSizeReader(_executor, _connectionManager, _controllerMetrics, _pinotHelixResourceManager);
     StorageQuotaChecker quotaChecker =
         new StorageQuotaChecker(offlineTableConfig, tableSizeReader, _controllerMetrics, _pinotHelixResourceManager,
-            _controllerLeadershipManager);
+            _leadControllerManager);
     return quotaChecker.isSegmentStorageWithinQuota(segmentFile, metadata.getName(),
         _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index f40e2b3..9261420 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -29,6 +29,7 @@ import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import org.slf4j.Logger;
@@ -56,10 +57,11 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
    * @param pinotHelixResourceManager The resource checker used to interact with Helix
    * @param config The controller configuration object
    */
-  public SegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
-      ControllerMetrics controllerMetrics) {
+  public SegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics) {
     super("SegmentStatusChecker", config.getStatusCheckerFrequencyInSeconds(),
-        config.getStatusCheckerInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
+        config.getStatusCheckerInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager,
+        controllerMetrics);
 
     _waitForPushTimeSeconds = config.getStatusCheckerWaitForPushTimeInSeconds();
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index a4d4d0b..e33ee62 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -53,7 +53,6 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.examples.MasterSlaveStateModelFactory;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
 import org.apache.helix.model.CurrentState;
@@ -99,6 +98,7 @@ import org.apache.pinot.common.utils.helix.PinotHelixPropertyStoreZnRecordProvid
 import org.apache.pinot.common.utils.retry.RetryPolicies;
 import org.apache.pinot.common.utils.retry.RetryPolicy;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.api.pojos.Instance;
 import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategy;
@@ -106,6 +106,7 @@ import org.apache.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategy
 import org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategy;
 import org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategyEnum;
 import org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategyFactory;
+import org.apache.pinot.controller.helix.core.statemodel.LeadControllerResourceMasterSlaveStateModelFactory;
 import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
 import org.apache.pinot.controller.helix.starter.HelixConfig;
 import org.apache.pinot.core.realtime.stream.StreamConfig;
@@ -144,6 +145,7 @@ public class PinotHelixResourceManager {
   private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
   private RebalanceSegmentStrategyFactory _rebalanceSegmentStrategyFactory;
   private TableRebalancer _tableRebalancer;
+  private LeadControllerManager _leadControllerManager;
 
   public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String helixClusterName,
       @Nonnull String controllerInstanceId, String dataDir, long externalViewOnlineToOfflineTimeoutMillis,
@@ -170,6 +172,7 @@ public class PinotHelixResourceManager {
    * Create Helix cluster if needed, and then start a Pinot controller instance.
    */
   public synchronized void start() {
+    _leadControllerManager = new LeadControllerManager();
     _helixZkManager = registerAndConnectAsHelixParticipant();
     _helixAdmin = _helixZkManager.getClusterManagmentTool();
     _propertyStore = _helixZkManager.getHelixPropertyStore();
@@ -255,6 +258,16 @@ public class PinotHelixResourceManager {
     return _propertyStore;
   }
 
+
+  /**
+   * Get lead controller manager.
+   *
+   * @return lead controller manager
+   */
+  public LeadControllerManager getLeadControllerManager() {
+    return _leadControllerManager;
+  }
+
   /**
    * Register and connect to Helix cluster as PARTICIPANT role.
    */
@@ -263,8 +276,8 @@ public class PinotHelixResourceManager {
         HelixManagerFactory.getZKHelixManager(_helixClusterName, _instanceId, InstanceType.PARTICIPANT, _helixZkURL);
 
     // Registers Master-Slave state model to state machine engine, which is for calculating participant assignment in lead controller resource.
-    helixManager.getStateMachineEngine()
-        .registerStateModelFactory(MasterSlaveSMD.name, new MasterSlaveStateModelFactory());
+    helixManager.getStateMachineEngine().registerStateModelFactory(MasterSlaveSMD.name,
+        new LeadControllerResourceMasterSlaveStateModelFactory(_leadControllerManager));
 
     try {
       helixManager.connect();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 7a09c00..649b966 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -29,6 +29,7 @@ import org.apache.pinot.common.config.TableTaskConfig;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
 import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegistry;
@@ -51,10 +52,11 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
   private final TaskGeneratorRegistry _taskGeneratorRegistry;
 
   public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
-      PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
-      ControllerMetrics controllerMetrics) {
+      PinotHelixResourceManager helixResourceManager, LeadControllerManager leadControllerManager,
+      ControllerConf controllerConf, ControllerMetrics controllerMetrics) {
     super("PinotTaskManager", controllerConf.getTaskManagerFrequencyInSeconds(),
-        controllerConf.getPinotTaskManagerInitialDelaySeconds(), helixResourceManager, controllerMetrics);
+        controllerConf.getPinotTaskManagerInitialDelaySeconds(), helixResourceManager, leadControllerManager,
+        controllerMetrics);
     _helixTaskResourceManager = helixTaskResourceManager;
     _clusterInfoProvider = new ClusterInfoProvider(helixResourceManager, helixTaskResourceManager, controllerConf);
     _taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoProvider);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 7e764f3..76d106f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -23,6 +23,7 @@ import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.core.periodictask.BasePeriodicTask;
 import org.slf4j.Logger;
@@ -40,12 +41,15 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
   private static final Logger LOGGER = LoggerFactory.getLogger(ControllerPeriodicTask.class);
 
   protected final PinotHelixResourceManager _pinotHelixResourceManager;
+  protected final LeadControllerManager _leadControllerManager;
   protected final ControllerMetrics _controllerMetrics;
 
   public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds, long initialDelayInSeconds,
-      PinotHelixResourceManager pinotHelixResourceManager, ControllerMetrics controllerMetrics) {
+      PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager,
+      ControllerMetrics controllerMetrics) {
     super(taskName, runFrequencyInSeconds, initialDelayInSeconds);
     _pinotHelixResourceManager = pinotHelixResourceManager;
+    _leadControllerManager = leadControllerManager;
     _controllerMetrics = controllerMetrics;
   }
 
@@ -75,6 +79,9 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
         LOGGER.info("Task: {} is stopped, early terminate the task", _taskName);
         break;
       }
+      if (!_leadControllerManager.isLeaderForTable(tableNameWithType)) {
+        continue;
+      }
       try {
         processTable(tableNameWithType, context);
       } catch (Exception e) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java
index 8a2b63c..858bfca 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.controller.helix.core.periodictask;
 
 import java.util.List;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.LeadershipChangeSubscriber;
 import org.apache.pinot.core.periodictask.PeriodicTask;
 import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
@@ -39,11 +39,11 @@ public class ControllerPeriodicTaskScheduler extends PeriodicTaskScheduler imple
    * Initialize the {@link ControllerPeriodicTaskScheduler} with the list of {@link ControllerPeriodicTask} created at startup
    * This is called only once during controller startup
    * @param controllerPeriodicTasks
-   * @param controllerLeadershipManager
+   * @param leadControllerManager
    */
-  public void init(List<PeriodicTask> controllerPeriodicTasks, ControllerLeadershipManager controllerLeadershipManager) {
+  public void init(List<PeriodicTask> controllerPeriodicTasks, LeadControllerManager leadControllerManager) {
     super.init(controllerPeriodicTasks);
-    controllerLeadershipManager.subscribe(ControllerPeriodicTaskScheduler.class.getName(), this);
+    leadControllerManager.subscribe(ControllerPeriodicTaskScheduler.class.getName(), this);
   }
 
   @Override
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index c833261..0982c30 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -66,7 +66,7 @@ import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.common.utils.retry.RetryPolicies;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
@@ -123,13 +123,13 @@ public class PinotLLCRealtimeSegmentManager {
   private final TableConfigCache _tableConfigCache;
   private final StreamPartitionAssignmentGenerator _streamPartitionAssignmentGenerator;
   private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
-  private final ControllerLeadershipManager _controllerLeadershipManager;
+  private final LeadControllerManager _leadControllerManager;
 
   private volatile boolean _isStopping = false;
   private AtomicInteger _numCompletingSegments = new AtomicInteger(0);
 
   public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
-      ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
+      ControllerMetrics controllerMetrics, LeadControllerManager leadControllerManager) {
     _helixAdmin = helixResourceManager.getHelixAdmin();
     _helixManager = helixResourceManager.getHelixZkManager();
     _propertyStore = helixResourceManager.getPropertyStore();
@@ -145,7 +145,7 @@ public class PinotLLCRealtimeSegmentManager {
     _tableConfigCache = new TableConfigCache(_propertyStore);
     _streamPartitionAssignmentGenerator = new StreamPartitionAssignmentGenerator(_helixManager);
     _flushThresholdUpdateManager = new FlushThresholdUpdateManager();
-    _controllerLeadershipManager = controllerLeadershipManager;
+    _leadControllerManager = leadControllerManager;
   }
 
 
@@ -182,8 +182,8 @@ public class PinotLLCRealtimeSegmentManager {
     LOGGER.info("Wait completed: Number of completing segments = {}", _numCompletingSegments.get());
   }
 
-  protected boolean isLeader() {
-    return _controllerLeadershipManager.isLeader();
+  protected boolean isLeader(String tableName) {
+    return _leadControllerManager.isLeaderForTable(tableName);
   }
 
   protected boolean isConnected() {
@@ -341,10 +341,10 @@ public class PinotLLCRealtimeSegmentManager {
     URI uriToMoveTo = ControllerConf.getUriFromPath(StringUtil.join("/", tableDirURI.toString(), segmentName));
     PinotFS pinotFS = PinotFSFactory.create(baseDirURI.getScheme());
 
-    if (!isConnected() || !isLeader()) {
+    if (!isConnected() || !isLeader(tableName)) {
       // We can potentially log a different value than what we saw ....
       LOGGER.warn("Lost leadership while committing segment file {}, {} for table {}: isLeader={}, isConnected={}",
-          segmentName, segmentLocation, tableName, isLeader(), isConnected());
+          segmentName, segmentLocation, tableName, isLeader(tableName), isConnected());
       _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
       return false;
     }
@@ -540,17 +540,17 @@ public class PinotLLCRealtimeSegmentManager {
     final String oldZnodePath =
         ZKMetadataProvider.constructPropertyStorePathForSegment(realtimeTableName, committingSegmentNameStr);
 
-    if (!isConnected() || !isLeader()) {
+    if (!isConnected() || !isLeader(realtimeTableName)) {
       // We can potentially log a different value than what we saw ....
       LOGGER.warn("Lost leadership while committing segment metadata for {} for table {}: isLeader={}, isConnected={}",
-          committingSegmentNameStr, realtimeTableName, isLeader(), isConnected());
+          committingSegmentNameStr, realtimeTableName, isLeader(realtimeTableName), isConnected());
       _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
       return false;
     }
     boolean success = writeSegmentToPropertyStore(oldZnodePath, oldZnRecord, realtimeTableName, stat.getVersion());
     if (!success) {
       LOGGER.warn("Fail to write old segment to property store for {} for table {}: isLeader={}, isConnected={}",
-          committingSegmentNameStr, realtimeTableName, isLeader(), isConnected());
+          committingSegmentNameStr, realtimeTableName, isLeader(realtimeTableName), isConnected());
     }
     return success;
   }
@@ -599,11 +599,11 @@ public class PinotLLCRealtimeSegmentManager {
         ZKMetadataProvider.constructPropertyStorePathForSegment(realtimeTableName, newSegmentNameStr);
 
     if (!isNewTableSetup) {
-      if (!isLeader() || !isConnected()) {
+      if (!isLeader(realtimeTableName) || !isConnected()) {
         // We can potentially log a different value than what we saw ....
         LOGGER.warn(
             "Lost leadership while committing new segment metadata for {} for table {}: isLeader={}, isConnected={}",
-            newSegmentNameStr, rawTableName, isLeader(), isConnected());
+            newSegmentNameStr, rawTableName, isLeader(realtimeTableName), isConnected());
         _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
         return false;
       }
@@ -612,7 +612,7 @@ public class PinotLLCRealtimeSegmentManager {
     boolean success = writeSegmentToPropertyStore(newZnodePath, newZnRecord, realtimeTableName);
     if (!success) {
       LOGGER.warn("Fail to write new segment to property store for {} for table {}: isLeader={}, isConnected={}",
-          newSegmentNameStr, rawTableName, isLeader(), isConnected());
+          newSegmentNameStr, rawTableName, isLeader(realtimeTableName), isConnected());
     }
     return success;
   }
@@ -1347,8 +1347,4 @@ public class PinotLLCRealtimeSegmentManager {
 
     return idealState;
   }
-
-  public ControllerLeadershipManager getControllerLeadershipManager() {
-    return _controllerLeadershipManager;
-  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index a4da6b4..8935c6b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.controller.helix.core.realtime;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -29,7 +28,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.helix.HelixManager;
-import org.apache.helix.ZNRecord;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerMeter;
@@ -37,12 +35,13 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.common.utils.SegmentName.SEPARATOR;
+
 
 /**
  * This is a singleton class in the controller that drives the state machines for segments that are in the
@@ -73,7 +72,7 @@ public class SegmentCompletionManager {
   private final Map<String, Long> _commitTimeMap = new ConcurrentHashMap<>();
   private final PinotLLCRealtimeSegmentManager _segmentManager;
   private final ControllerMetrics _controllerMetrics;
-  private final ControllerLeadershipManager _controllerLeadershipManager;
+  private final LeadControllerManager _leadControllerManager;
   private final Lock[] _fsmLocks;
   private static final int NUM_FSM_LOCKS = 20;
 
@@ -87,12 +86,12 @@ public class SegmentCompletionManager {
   // TODO keep some history of past committed segments so that we can avoid looking up PROPERTYSTORE if some server comes in late.
 
   public SegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager,
-      ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager,
+      ControllerMetrics controllerMetrics, LeadControllerManager leadControllerManager,
       int segmentCommitTimeoutSeconds) {
     _helixManager = helixManager;
     _segmentManager = segmentManager;
     _controllerMetrics = controllerMetrics;
-    _controllerLeadershipManager = controllerLeadershipManager;
+    _leadControllerManager = leadControllerManager;
     SegmentCompletionProtocol
         .setMaxSegmentCommitTimeMs(TimeUnit.MILLISECONDS.convert(segmentCommitTimeoutSeconds, TimeUnit.SECONDS));
     _fsmLocks = new Lock[NUM_FSM_LOCKS];
@@ -163,11 +162,12 @@ public class SegmentCompletionManager {
    * that it currently has (i.e. next offset that it will consume, if it continues to consume).
    */
   public SegmentCompletionProtocol.Response segmentConsumed(SegmentCompletionProtocol.Request.Params reqParams) {
-    if (!isLeader() || !_helixManager.isConnected()) {
+    final String segmentNameStr = reqParams.getSegmentName();
+    final String tableName = segmentNameStr.split(SEPARATOR)[0];
+    if (!isLeader(tableName) || !_helixManager.isConnected()) {
       _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
-    final String segmentNameStr = reqParams.getSegmentName();
     final String instanceId = reqParams.getInstanceId();
     final String stopReason = reqParams.getReason();
     final long offset = reqParams.getOffset();
@@ -201,11 +201,12 @@ public class SegmentCompletionManager {
    */
   public SegmentCompletionProtocol.Response segmentCommitStart(
       final SegmentCompletionProtocol.Request.Params reqParams) {
-    if (!isLeader() || !_helixManager.isConnected()) {
+    final String segmentNameStr = reqParams.getSegmentName();
+    final String tableName = segmentNameStr.split(SEPARATOR)[0];
+    if (!isLeader(tableName) || !_helixManager.isConnected()) {
       _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
-    final String segmentNameStr = reqParams.getSegmentName();
     final String instanceId = reqParams.getInstanceId();
     final long offset = reqParams.getOffset();
     LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
@@ -225,11 +226,12 @@ public class SegmentCompletionManager {
   }
 
   public SegmentCompletionProtocol.Response extendBuildTime(final SegmentCompletionProtocol.Request.Params reqParams) {
-    if (!isLeader() || !_helixManager.isConnected()) {
+    final String segmentNameStr = reqParams.getSegmentName();
+    final String tableName = segmentNameStr.split(SEPARATOR)[0];
+    if (!isLeader(tableName) || !_helixManager.isConnected()) {
       _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
-    final String segmentNameStr = reqParams.getSegmentName();
     final String instanceId = reqParams.getInstanceId();
     final long offset = reqParams.getOffset();
     final int extTimeSec = reqParams.getExtraTimeSec();
@@ -256,11 +258,12 @@ public class SegmentCompletionManager {
    */
   public SegmentCompletionProtocol.Response segmentStoppedConsuming(
       SegmentCompletionProtocol.Request.Params reqParams) {
-    if (!isLeader() || !_helixManager.isConnected()) {
+    final String segmentNameStr = reqParams.getSegmentName();
+    final String tableName = segmentNameStr.split(SEPARATOR)[0];
+    if (!isLeader(tableName) || !_helixManager.isConnected()) {
       _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
-    final String segmentNameStr = reqParams.getSegmentName();
     final String instanceId = reqParams.getInstanceId();
     final long offset = reqParams.getOffset();
     final String reason = reqParams.getReason();
@@ -292,11 +295,12 @@ public class SegmentCompletionManager {
    */
   public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams,
       boolean success, boolean isSplitCommit, CommittingSegmentDescriptor committingSegmentDescriptor) {
-    if (!isLeader() || !_helixManager.isConnected()) {
+    final String segmentNameStr = reqParams.getSegmentName();
+    final String tableName = segmentNameStr.split(SEPARATOR)[0];
+    if (!isLeader(tableName) || !_helixManager.isConnected()) {
       _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
-    final String segmentNameStr = reqParams.getSegmentName();
     LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
     SegmentCompletionFSM fsm = null;
     SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.RESP_FAILED;
@@ -352,6 +356,7 @@ public class SegmentCompletionManager {
     State _state = State.HOLDING;   // Typically start off in HOLDING state.
     final long _startTimeMs;
     private final LLCSegmentName _segmentName;
+    private final String _realtimeTableName;
     private final int _numReplicas;
     private final Set<String> _excludedServerStateMap;
     private final Map<String, Long> _commitStateMap;
@@ -394,6 +399,7 @@ public class SegmentCompletionManager {
     private SegmentCompletionFSM(PinotLLCRealtimeSegmentManager segmentManager,
         SegmentCompletionManager segmentCompletionManager, LLCSegmentName segmentName, int numReplicas) {
       _segmentName = segmentName;
+      _realtimeTableName = _segmentName.getTableName();
       _numReplicas = numReplicas;
       _segmentManager = segmentManager;
       _commitStateMap = new HashMap<>(_numReplicas);
@@ -403,8 +409,8 @@ public class SegmentCompletionManager {
       _maxTimeToPickWinnerMs = _startTimeMs + MAX_TIME_TO_PICK_WINNER_MS;
       _maxTimeToNotifyWinnerMs = _startTimeMs + MAX_TIME_TO_NOTIFY_WINNER_MS;
       long initialCommitTimeMs =
-          MAX_TIME_TO_NOTIFY_WINNER_MS + _segmentManager.getCommitTimeoutMS(_segmentName.getTableName());
-      Long savedCommitTime = _segmentCompletionManager._commitTimeMap.get(segmentName.getTableName());
+          MAX_TIME_TO_NOTIFY_WINNER_MS + _segmentManager.getCommitTimeoutMS(_realtimeTableName);
+      Long savedCommitTime = _segmentCompletionManager._commitTimeMap.get(_realtimeTableName);
       if (savedCommitTime != null && savedCommitTime > initialCommitTimeMs) {
         initialCommitTimeMs = savedCommitTime;
       }
@@ -413,7 +419,7 @@ public class SegmentCompletionManager {
         // The table has a really high value configured for max commit time. Set it to a higher value than default
         // and go from there.
         LOGGER.info("Configured max commit time {}s too high for table {}, changing to {}s", initialCommitTimeMs / 1000,
-            segmentName.getTableName(), MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS);
+            _realtimeTableName, MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS);
         initialCommitTimeMs = MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS * 1000;
       }
       _initialCommitTimeMs = initialCommitTimeMs;
@@ -681,14 +687,14 @@ public class SegmentCompletionManager {
     private SegmentCompletionProtocol.Response abortAndReturnHold(long now, String instanceId, long offset) {
       _state = State.ABORTED;
       _segmentCompletionManager._controllerMetrics
-          .addMeteredTableValue(_segmentName.getTableName(), ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
+          .addMeteredTableValue(_realtimeTableName, ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
       return hold(instanceId, offset);
     }
 
     private SegmentCompletionProtocol.Response abortAndReturnFailed() {
       _state = State.ABORTED;
       _segmentCompletionManager._controllerMetrics
-          .addMeteredTableValue(_segmentName.getTableName(), ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
+          .addMeteredTableValue(_realtimeTableName, ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
       return SegmentCompletionProtocol.RESP_FAILED;
     }
 
@@ -1117,7 +1123,7 @@ public class SegmentCompletionManager {
   }
 
   @VisibleForTesting
-  protected boolean isLeader() {
-    return _controllerLeadershipManager.isLeader();
+  protected boolean isLeader(String tableName) {
+    return _leadControllerManager.isLeaderForTable(tableName);
   }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index ea5b05b..2c2c017 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -38,6 +38,7 @@ import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.common.utils.retry.RetryPolicies;
 import org.apache.pinot.common.utils.time.TimeUtils;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
 import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
@@ -55,10 +56,11 @@ import org.slf4j.LoggerFactory;
 public class RealtimeSegmentRelocator extends ControllerPeriodicTask<Void> {
   private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeSegmentRelocator.class);
 
-  public RealtimeSegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
-      ControllerMetrics controllerMetrics) {
+  public RealtimeSegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics) {
     super("RealtimeSegmentRelocator", getRunFrequencySeconds(config.getRealtimeSegmentRelocatorFrequency()),
-        config.getRealtimeSegmentRelocationInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
+        config.getRealtimeSegmentRelocationInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager,
+        controllerMetrics);
   }
 
   @Override
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 7d5182d..fecb4ac 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -35,6 +35,7 @@ import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
 import org.apache.pinot.common.utils.SegmentName;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import org.apache.pinot.controller.helix.core.retention.strategy.RetentionStrategy;
@@ -54,10 +55,11 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
 
   private final int _deletedSegmentsRetentionInDays;
 
-  public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
-      ControllerMetrics controllerMetrics) {
+  public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics) {
     super("RetentionManager", config.getRetentionControllerFrequencyInSeconds(),
-        config.getRetentionManagerInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
+        config.getRetentionManagerInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager,
+        controllerMetrics);
     _deletedSegmentsRetentionInDays = config.getDeletedSegmentsRetentionInDays();
 
     LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}, deletedSegmentsRetentionInDays: {}",
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerChecker.java
new file mode 100644
index 0000000..69b56a3
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerChecker.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.statemodel;
+
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pinot.common.utils.CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE;
+
+
+public class LeadControllerChecker {
+  private static final Logger LOGGER = LoggerFactory.getLogger(LeadControllerChecker.class);
+
+  private Map<Integer, String> _partitionCache;
+
+  public LeadControllerChecker() {
+    _partitionCache = new ConcurrentHashMap<>();
+  }
+
+  public void addPartitionLeader(String partitionName) {
+    LOGGER.info("Add Partition: {} to LeadControllerChecker", partitionName);
+    int partitionIndex = Integer.parseInt(partitionName.substring(partitionName.lastIndexOf("_") + 1));
+    _partitionCache.putIfAbsent(partitionIndex, null);
+  }
+
+  public void removePartitionLeader(String partitionName) {
+    LOGGER.info("Remove Partition: {} from LeadControllerChecker", partitionName);
+    int partitionIndex = Integer.parseInt(partitionName.substring(partitionName.lastIndexOf("_") + 1));
+    _partitionCache.remove(partitionIndex);
+  }
+
+  public boolean isPartitionLeader(int partitionIndex) {
+    Preconditions.checkArgument(partitionIndex >= 0 && partitionIndex < NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE, "Invalid partition index: " + partitionIndex);
+    return _partitionCache.containsKey(partitionIndex);
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerResourceMasterSlaveStateModelFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerResourceMasterSlaveStateModelFactory.java
new file mode 100644
index 0000000..d063c58
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerResourceMasterSlaveStateModelFactory.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.statemodel;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class LeadControllerResourceMasterSlaveStateModelFactory extends MasterSlaveStateModelFactory {
+  private static final Logger LOGGER = LoggerFactory.getLogger(LeadControllerResourceMasterSlaveStateModelFactory.class);
+
+  private final LeadControllerManager _leadControllerManager;
+
+  public LeadControllerResourceMasterSlaveStateModelFactory(LeadControllerManager leadControllerManager) {
+    super();
+    _leadControllerManager = leadControllerManager;
+  }
+
+  @Override
+  public StateModel createNewStateModel(String resourceName, String partitionName) {
+    MasterSlaveStateModel stateModel = new LeadControllerResourceMasterSlaveStateModel();
+    stateModel.setPartitionName(partitionName);
+    return stateModel;
+  }
+
+  public class LeadControllerResourceMasterSlaveStateModel extends MasterSlaveStateModel {
+    @Override
+    public void onBecomeSlaveFromMaster(Message message, NotificationContext context) {
+      super.onBecomeSlaveFromMaster(message, context);
+      String partitionName = message.getPartitionName();
+      _leadControllerManager.addPartitionLeader(partitionName);
+      _leadControllerManager.onLeadControllerChange();
+    }
+
+    @Override
+    public void onBecomeMasterFromSlave(Message message, NotificationContext context) {
+      super.onBecomeMasterFromSlave(message, context);
+      String partitionName = message.getPartitionName();
+      _leadControllerManager.removePartitionLeader(partitionName);
+      _leadControllerManager.onLeadControllerChange();
+    }
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
index 5748d3c..a82a161 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
@@ -24,6 +24,7 @@ import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import org.slf4j.Logger;
@@ -37,9 +38,9 @@ public class BrokerResourceValidationManager extends ControllerPeriodicTask<Brok
   private static final Logger LOGGER = LoggerFactory.getLogger(BrokerResourceValidationManager.class);
 
   public BrokerResourceValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
-      ControllerMetrics controllerMetrics) {
+      LeadControllerManager leadControllerManager, ControllerMetrics controllerMetrics) {
     super("BrokerResourceValidationManager", config.getBrokerResourceValidationFrequencyInSeconds(),
-        config.getBrokerResourceValidationInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
+        config.getBrokerResourceValidationInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager, controllerMetrics);
   }
 
   @Override
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index 7f19395..0eeadc7 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -30,6 +30,7 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.metrics.ValidationMetrics;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import org.apache.pinot.controller.util.SegmentIntervalUtils;
@@ -50,9 +51,11 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask<Void>
   private final ValidationMetrics _validationMetrics;
 
   public OfflineSegmentIntervalChecker(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
-      ValidationMetrics validationMetrics, ControllerMetrics controllerMetrics) {
+      LeadControllerManager leadControllerManager, ValidationMetrics validationMetrics,
+      ControllerMetrics controllerMetrics) {
     super("OfflineSegmentIntervalChecker", config.getOfflineSegmentIntervalCheckerFrequencyInSeconds(),
-        config.getOfflineSegmentIntervalCheckerInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
+        config.getOfflineSegmentIntervalCheckerInitialDelayInSeconds(), pinotHelixResourceManager,
+        leadControllerManager, controllerMetrics);
     _validationMetrics = validationMetrics;
   }
 
@@ -88,12 +91,12 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask<Void>
         if (SegmentIntervalUtils.isValidInterval(timeInterval)) {
           segmentIntervals.add(timeInterval);
         } else {
-          numSegmentsWithInvalidIntervals ++;
+          numSegmentsWithInvalidIntervals++;
         }
       }
       if (numSegmentsWithInvalidIntervals > 0) {
-        LOGGER.warn("Table: {} has {} segments with invalid interval", offlineTableName,
-            numSegmentsWithInvalidIntervals);
+        LOGGER
+            .warn("Table: {} has {} segments with invalid interval", offlineTableName, numSegmentsWithInvalidIntervals);
       }
       Duration frequency = SegmentIntervalUtils.convertToDuration(validationConfig.getSegmentPushFrequency());
       numMissingSegments = computeNumMissingSegments(segmentIntervals, frequency);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 5eb5a6f..88ad642 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -32,6 +32,7 @@ import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.HLCSegmentName;
 import org.apache.pinot.common.utils.SegmentName;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
@@ -53,10 +54,11 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea
   private long _lastUpdateRealtimeDocumentCountTimeMs = 0L;
 
   public RealtimeSegmentValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
-      PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, ValidationMetrics validationMetrics,
-      ControllerMetrics controllerMetrics) {
+      LeadControllerManager leadControllerManager, PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
+      ValidationMetrics validationMetrics, ControllerMetrics controllerMetrics) {
     super("RealtimeSegmentValidationManager", config.getRealtimeSegmentValidationFrequencyInSeconds(),
-        config.getRealtimeSegmentValidationManagerInitialDelaySeconds(), pinotHelixResourceManager, controllerMetrics);
+        config.getRealtimeSegmentValidationManagerInitialDelaySeconds(), pinotHelixResourceManager,
+        leadControllerManager, controllerMetrics);
     _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
     _validationMetrics = validationMetrics;
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
index db23301..c78c756 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
@@ -30,7 +30,7 @@ import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.utils.DataSize;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.util.TableSizeReader;
 import org.slf4j.Logger;
@@ -48,16 +48,16 @@ public class StorageQuotaChecker {
   private final TableConfig _tableConfig;
   private final ControllerMetrics _controllerMetrics;
   private final PinotHelixResourceManager _pinotHelixResourceManager;
-  private final ControllerLeadershipManager _controllerLeadershipManager;
+  private final LeadControllerManager _leadControllerManager;
 
   public StorageQuotaChecker(TableConfig tableConfig, TableSizeReader tableSizeReader,
       ControllerMetrics controllerMetrics, PinotHelixResourceManager pinotHelixResourceManager,
-      ControllerLeadershipManager controllerLeadershipManager) {
+      LeadControllerManager leadControllerManager) {
     _tableConfig = tableConfig;
     _tableSizeReader = tableSizeReader;
     _controllerMetrics = controllerMetrics;
     _pinotHelixResourceManager = pinotHelixResourceManager;
-    _controllerLeadershipManager = controllerLeadershipManager;
+    _leadControllerManager = leadControllerManager;
   }
 
   public static class QuotaCheckerResponse {
@@ -157,7 +157,7 @@ public class StorageQuotaChecker {
         tableNameWithType, tableSubtypeSize.estimatedSizeInBytes, tableSubtypeSize.reportedSizeInBytes);
 
     // Only emit the real percentage of storage quota usage by lead controller, otherwise emit 0L.
-    if (isLeader() && allowedStorageBytes != 0L) {
+    if (isLeader(tableNameWithType) && allowedStorageBytes != 0L) {
       long existingStorageQuotaUtilization = tableSubtypeSize.estimatedSizeInBytes * 100 / allowedStorageBytes;
       _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_STORAGE_QUOTA_UTILIZATION,
           existingStorageQuotaUtilization);
@@ -213,7 +213,7 @@ public class StorageQuotaChecker {
     }
   }
 
-  protected boolean isLeader() {
-    return _controllerLeadershipManager.isLeader();
+  protected boolean isLeader(String tableName) {
+    return _leadControllerManager.isLeaderForTable(tableName);
   }
 }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
index c235483..066040e 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -35,10 +35,12 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -46,6 +48,7 @@ import static org.mockito.Mockito.when;
 public class SegmentStatusCheckerTest {
   private SegmentStatusChecker segmentStatusChecker;
   private PinotHelixResourceManager helixResourceManager;
+  private LeadControllerManager leadControllerManager;
   private MetricsRegistry metricsRegistry;
   private ControllerMetrics controllerMetrics;
   private ControllerConf config;
@@ -84,9 +87,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(
@@ -146,9 +154,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(
@@ -222,9 +235,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(0);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(
@@ -264,9 +282,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
@@ -291,9 +314,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE),
@@ -349,9 +377,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(
@@ -390,9 +423,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
@@ -429,9 +467,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
     // verify state before test
     Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT), 0);
     // update metrics
@@ -463,9 +506,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
     // verify state before test
     Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT), 0);
     // update metrics
@@ -508,9 +556,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE),
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index a928922..dcd5469 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -27,10 +27,12 @@ import java.util.stream.IntStream;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
@@ -43,6 +45,7 @@ public class ControllerPeriodicTaskTest {
   private final ControllerConf _controllerConf = new ControllerConf();
 
   private final PinotHelixResourceManager _resourceManager = mock(PinotHelixResourceManager.class);
+  private final LeadControllerManager _leadControllerManager = mock(LeadControllerManager.class);
   private final ControllerMetrics _controllerMetrics = new ControllerMetrics(new MetricsRegistry());
   private final AtomicBoolean _startTaskCalled = new AtomicBoolean();
   private final AtomicBoolean _stopTaskCalled = new AtomicBoolean();
@@ -52,7 +55,8 @@ public class ControllerPeriodicTaskTest {
   private static final String TASK_NAME = "TestTask";
 
   private final ControllerPeriodicTask _task = new ControllerPeriodicTask<Void>(TASK_NAME, RUN_FREQUENCY_IN_SECONDS,
-      _controllerConf.getPeriodicTaskInitialDelayInSeconds(), _resourceManager, _controllerMetrics) {
+      _controllerConf.getPeriodicTaskInitialDelayInSeconds(), _resourceManager, _leadControllerManager,
+      _controllerMetrics) {
 
     @Override
     protected void setUpTask() {
@@ -81,6 +85,7 @@ public class ControllerPeriodicTaskTest {
     List<String> tables = new ArrayList<>(_numTables);
     IntStream.range(0, _numTables).forEach(i -> tables.add("table_" + i + " _OFFLINE"));
     when(_resourceManager.getAllTables()).thenReturn(tables);
+    when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
   }
 
   private void resetState() {
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 04e2b7b..af35420 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -56,7 +56,7 @@ import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.api.resources.LLCSegmentCompletionHandlers;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
@@ -66,7 +66,6 @@ import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.core.realtime.stream.OffsetCriteria;
 import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
 import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
 import org.apache.pinot.filesystem.PinotFSFactory;
 import org.apache.zookeeper.data.Stat;
@@ -1323,7 +1322,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     protected FakePinotLLCRealtimeSegmentManager(PinotHelixResourceManager pinotHelixResourceManager,
         List<String> existingLLCSegments, ControllerMetrics controllerMetrics) {
       super(pinotHelixResourceManager, CONTROLLER_CONF, controllerMetrics,
-          new ControllerLeadershipManager(pinotHelixResourceManager.getHelixZkManager(), controllerMetrics));
+          new LeadControllerManager());
 
       try {
         TableConfigCache mockCache = mock(TableConfigCache.class);
@@ -1513,7 +1512,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     }
 
     @Override
-    protected boolean isLeader() {
+    protected boolean isLeader(String tableName) {
       return IS_LEADER;
     }
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index b05f9bf..22e317f 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -30,7 +30,7 @@ import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
 import org.apache.zookeeper.data.Stat;
@@ -40,7 +40,6 @@ import org.testng.annotations.Test;
 
 import static org.apache.pinot.common.protocols.SegmentCompletionProtocol.ControllerResponseStatus;
 import static org.apache.pinot.common.protocols.SegmentCompletionProtocol.Request;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -1151,8 +1150,7 @@ public class SegmentCompletionTest {
 
     protected MockPinotLLCRealtimeSegmentManager(PinotHelixResourceManager pinotHelixResourceManager,
         ControllerMetrics controllerMetrics) {
-      super(pinotHelixResourceManager, CONTROLLER_CONF, controllerMetrics,
-          new ControllerLeadershipManager(pinotHelixResourceManager.getHelixZkManager(), controllerMetrics));
+      super(pinotHelixResourceManager, CONTROLLER_CONF, controllerMetrics, new LeadControllerManager());
     }
 
     @Override
@@ -1210,7 +1208,7 @@ public class SegmentCompletionTest {
     protected MockSegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager,
         boolean isLeader, ControllerMetrics controllerMetrics) {
       super(helixManager, segmentManager, controllerMetrics,
-          new ControllerLeadershipManager(helixManager, controllerMetrics),
+          new LeadControllerManager(),
           SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds());
       _isLeader = isLeader;
     }
@@ -1221,7 +1219,7 @@ public class SegmentCompletionTest {
     }
 
     @Override
-    protected boolean isLeader() {
+    protected boolean isLeader(String tableName) {
       return _isLeader;
     }
   }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
index 8c5c8ec..9316062 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
@@ -34,12 +34,14 @@ import org.apache.helix.model.builder.CustomModeISBuilder;
 import org.apache.pinot.common.config.RealtimeTagConfig;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -48,6 +50,7 @@ public class RealtimeSegmentRelocatorTest {
 
   private TestRealtimeSegmentRelocator _realtimeSegmentRelocator;
   private HelixManager _mockHelixManager;
+  private LeadControllerManager _leadControllerManager;
 
   private String[] serverNames;
   private String[] consumingServerNames;
@@ -69,10 +72,13 @@ public class RealtimeSegmentRelocatorTest {
     PinotHelixResourceManager mockPinotHelixResourceManager = mock(PinotHelixResourceManager.class);
     _mockHelixManager = mock(HelixManager.class);
     when(mockPinotHelixResourceManager.getHelixZkManager()).thenReturn(_mockHelixManager);
+    LeadControllerManager mockLeadControllerManager = mock(LeadControllerManager.class);
+    when(mockLeadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
     ControllerConf controllerConfig = new ControllerConf();
     ControllerMetrics controllerMetrics = new ControllerMetrics(new MetricsRegistry());
     _realtimeSegmentRelocator =
-        new TestRealtimeSegmentRelocator(mockPinotHelixResourceManager, controllerConfig, controllerMetrics);
+        new TestRealtimeSegmentRelocator(mockPinotHelixResourceManager, mockLeadControllerManager, controllerConfig,
+            controllerMetrics);
 
     final int maxInstances = 20;
     serverNames = new String[maxInstances];
@@ -268,9 +274,9 @@ public class RealtimeSegmentRelocatorTest {
 
     private Map<String, List<String>> tagToInstances;
 
-    public TestRealtimeSegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
-        ControllerMetrics controllerMetrics) {
-      super(pinotHelixResourceManager, config, controllerMetrics);
+    public TestRealtimeSegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager,
+        LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics) {
+      super(pinotHelixResourceManager, leadControllerManager, config, controllerMetrics);
       tagToInstances = new HashedMap();
     }
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index d4adfe1..465a9af 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -35,6 +35,7 @@ import org.apache.pinot.common.segment.SegmentMetadata;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
 import org.apache.pinot.controller.helix.core.SegmentDeletionManager;
@@ -84,6 +85,9 @@ public class RetentionManagerTest {
     PinotHelixResourceManager pinotHelixResourceManager = mock(PinotHelixResourceManager.class);
     setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager);
 
+    LeadControllerManager leadControllerManager = mock(LeadControllerManager.class);
+    when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+
     when(pinotHelixResourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
     when(pinotHelixResourceManager.getOfflineSegmentMetadata(OFFLINE_TABLE_NAME)).thenReturn(metadataList);
 
@@ -91,7 +95,8 @@ public class RetentionManagerTest {
     ControllerMetrics controllerMetrics = new ControllerMetrics(new MetricsRegistry());
     conf.setRetentionControllerFrequencyInSeconds(0);
     conf.setDeletedSegmentsRetentionInDays(0);
-    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, conf, controllerMetrics);
+    RetentionManager retentionManager =
+        new RetentionManager(pinotHelixResourceManager, leadControllerManager, conf, controllerMetrics);
     retentionManager.start();
     retentionManager.run();
 
@@ -210,11 +215,14 @@ public class RetentionManagerTest {
         setupSegmentMetadata(tableConfig, now, initialNumSegments, removedSegments);
     setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager);
 
+    LeadControllerManager leadControllerManager = mock(LeadControllerManager.class);
+    when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+
     ControllerConf conf = new ControllerConf();
     ControllerMetrics controllerMetrics = new ControllerMetrics(new MetricsRegistry());
     conf.setRetentionControllerFrequencyInSeconds(0);
     conf.setDeletedSegmentsRetentionInDays(0);
-    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, conf, controllerMetrics);
+    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, leadControllerManager, conf, controllerMetrics);
     retentionManager.start();
     retentionManager.run();
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
index e84a947..f8fb6ca 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
@@ -30,7 +30,7 @@ import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.util.TableSizeReader;
 import org.mockito.invocation.InvocationOnMock;
@@ -49,7 +49,7 @@ public class StorageQuotaCheckerTest {
   private TableConfig _tableConfig;
   private ControllerMetrics _controllerMetrics;
   private PinotHelixResourceManager _pinotHelixResourceManager;
-  private ControllerLeadershipManager _controllerLeadershipManager;
+  private LeadControllerManager _leadControllerManager;
   private QuotaConfig _quotaConfig;
   private SegmentsValidationAndRetentionConfig _validationConfig;
   private static final File TEST_DIR = new File(StorageQuotaCheckerTest.class.getName());
@@ -62,7 +62,7 @@ public class StorageQuotaCheckerTest {
     _controllerMetrics = new ControllerMetrics(new MetricsRegistry());
     _validationConfig = mock(SegmentsValidationAndRetentionConfig.class);
     _pinotHelixResourceManager = mock(PinotHelixResourceManager.class);
-    _controllerLeadershipManager = mock(ControllerLeadershipManager.class);
+    _leadControllerManager = mock(LeadControllerManager.class);
     when(_tableConfig.getValidationConfig()).thenReturn(_validationConfig);
     when(_validationConfig.getReplicationNumber()).thenReturn(2);
     TEST_DIR.mkdirs();
@@ -78,10 +78,9 @@ public class StorageQuotaCheckerTest {
       throws InvalidConfigException {
     StorageQuotaChecker checker =
         new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager,
-            _controllerLeadershipManager);
+            _leadControllerManager);
     when(_tableConfig.getQuotaConfig()).thenReturn(null);
-    StorageQuotaChecker.QuotaCheckerResponse res =
-        checker.isSegmentStorageWithinQuota(TEST_DIR, "segment", 1000);
+    StorageQuotaChecker.QuotaCheckerResponse res = checker.isSegmentStorageWithinQuota(TEST_DIR, "segment", 1000);
     Assert.assertTrue(res.isSegmentWithinQuota);
   }
 
@@ -90,11 +89,10 @@ public class StorageQuotaCheckerTest {
       throws InvalidConfigException {
     StorageQuotaChecker checker =
         new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager,
-            _controllerLeadershipManager);
+            _leadControllerManager);
     when(_tableConfig.getQuotaConfig()).thenReturn(_quotaConfig);
     when(_quotaConfig.storageSizeBytes()).thenReturn(-1L);
-    StorageQuotaChecker.QuotaCheckerResponse res =
-        checker.isSegmentStorageWithinQuota(TEST_DIR, "segment", 1000);
+    StorageQuotaChecker.QuotaCheckerResponse res = checker.isSegmentStorageWithinQuota(TEST_DIR, "segment", 1000);
     Assert.assertTrue(res.isSegmentWithinQuota);
   }
 
@@ -134,9 +132,8 @@ public class StorageQuotaCheckerTest {
     when(_quotaConfig.getStorage()).thenReturn("3K");
     StorageQuotaChecker checker =
         new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager,
-            _controllerLeadershipManager);
-    StorageQuotaChecker.QuotaCheckerResponse response =
-        checker.isSegmentStorageWithinQuota(TEST_DIR, "segment1", 1000);
+            _leadControllerManager);
+    StorageQuotaChecker.QuotaCheckerResponse response = checker.isSegmentStorageWithinQuota(TEST_DIR, "segment1", 1000);
     Assert.assertTrue(response.isSegmentWithinQuota);
     Assert.assertEquals(
         _controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.TABLE_STORAGE_QUOTA_UTILIZATION), 80L);
@@ -184,12 +181,12 @@ public class StorageQuotaCheckerTest {
 
     public MockStorageQuotaChecker(TableConfig tableConfig, TableSizeReader tableSizeReader,
         ControllerMetrics controllerMetrics, PinotHelixResourceManager pinotHelixResourceManager,
-        ControllerLeadershipManager controllerLeadershipManager) {
-      super(tableConfig, tableSizeReader, controllerMetrics, pinotHelixResourceManager, controllerLeadershipManager);
+        LeadControllerManager leadControllerManager) {
+      super(tableConfig, tableSizeReader, controllerMetrics, pinotHelixResourceManager, leadControllerManager);
     }
 
     @Override
-    protected boolean isLeader() {
+    protected boolean isLeader(String tableName) {
       return true;
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org