You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/06/14 23:47:43 UTC

[incubator-pinot] branch add-logic-for-lead-controller-resource updated (df041cb -> 78c1340)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a change to branch add-logic-for-lead-controller-resource
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


 discard df041cb  Add logic for lead controller resource on controller side
     new 78c1340  Add logic for lead controller resource on controller side

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (df041cb)
            \
             N -- N -- N   refs/heads/add-logic-for-lead-controller-resource (78c1340)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../src/main/java/org/apache/pinot/controller/ControllerStarter.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Add logic for lead controller resource on controller side

Posted by jl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch add-logic-for-lead-controller-resource
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 78c134081aee26930b3de37cc86904380ad9c4e4
Author: jackjlli <jl...@linkedin.com>
AuthorDate: Fri Jun 14 16:35:14 2019 -0700

    Add logic for lead controller resource on controller side
---
 .../apache/pinot/controller/ControllerStarter.java | 44 ++++++++-----
 .../pinot/controller/LeadControllerManager.java    | 76 ++++++++++++++++++++++
 .../PinotSegmentUploadRestletResource.java         |  7 +-
 .../controller/api/upload/SegmentValidator.java    | 10 +--
 .../controller/helix/SegmentStatusChecker.java     |  8 ++-
 .../helix/core/PinotHelixResourceManager.java      | 19 +++++-
 .../helix/core/minion/PinotTaskManager.java        |  8 ++-
 .../core/periodictask/ControllerPeriodicTask.java  |  9 ++-
 .../ControllerPeriodicTaskScheduler.java           |  8 +--
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 32 ++++-----
 .../core/realtime/SegmentCompletionManager.java    | 54 ++++++++-------
 .../core/relocation/RealtimeSegmentRelocator.java  |  8 ++-
 .../helix/core/retention/RetentionManager.java     |  8 ++-
 .../core/statemodel/LeadControllerChecker.java     | 55 ++++++++++++++++
 ...rollerResourceMasterSlaveStateModelFactory.java | 64 ++++++++++++++++++
 .../BrokerResourceValidationManager.java           |  5 +-
 .../validation/OfflineSegmentIntervalChecker.java  | 13 ++--
 .../RealtimeSegmentValidationManager.java          |  8 ++-
 .../controller/validation/StorageQuotaChecker.java | 14 ++--
 .../controller/helix/SegmentStatusCheckerTest.java | 73 ++++++++++++++++++---
 .../periodictask/ControllerPeriodicTaskTest.java   |  7 +-
 .../PinotLLCRealtimeSegmentManagerTest.java        |  7 +-
 .../helix/core/realtime/SegmentCompletionTest.java | 10 ++-
 .../relocation/RealtimeSegmentRelocatorTest.java   | 14 ++--
 .../helix/core/retention/RetentionManagerTest.java | 12 +++-
 .../validation/StorageQuotaCheckerTest.java        | 27 ++++----
 26 files changed, 455 insertions(+), 145 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index c313616..97b6095 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -113,6 +113,7 @@ public class ControllerStarter {
   private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
   private SegmentCompletionManager _segmentCompletionManager;
   private ControllerLeadershipManager _controllerLeadershipManager;
+  private LeadControllerManager _leadControllerManager;
   private List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbackList;
 
   public ControllerStarter(ControllerConf conf) {
@@ -255,6 +256,10 @@ public class ControllerStarter {
     _helixResourceManager.start();
     HelixManager helixParticipantManager = _helixResourceManager.getHelixZkManager();
 
+    // Get lead controller manager from resource manager, register controller leadership manager to get helix leader.
+    _leadControllerManager = _helixResourceManager.getLeadControllerManager();
+    _leadControllerManager.registerControllerLeadershipManager(_controllerLeadershipManager);
+
     LOGGER.info("Registering controller leadership manager");
     // TODO: when Helix separation is completed, leadership only depends on the master in leadControllerResource, remove
     //       ControllerLeadershipManager and this callback.
@@ -266,15 +271,13 @@ public class ControllerStarter {
 
     // Helix resource manager must be started in order to create PinotLLCRealtimeSegmentManager
     LOGGER.info("Starting realtime segment manager");
-
     _pinotLLCRealtimeSegmentManager =
-        new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config, _controllerMetrics,
-            _controllerLeadershipManager);
+        new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config, _controllerMetrics, _leadControllerManager);
     // TODO: Need to put this inside HelixResourceManager when ControllerLeadershipManager is removed.
     _helixResourceManager.registerPinotLLCRealtimeSegmentManager(_pinotLLCRealtimeSegmentManager);
     _segmentCompletionManager =
         new SegmentCompletionManager(helixParticipantManager, _pinotLLCRealtimeSegmentManager, _controllerMetrics,
-            _controllerLeadershipManager, _config.getSegmentCommitTimeoutSeconds());
+            _leadControllerManager, _config.getSegmentCommitTimeoutSeconds());
 
     if (_config.getHLCTablesAllowed()) {
       LOGGER.info("Realtime tables with High Level consumers will be supported");
@@ -289,7 +292,9 @@ public class ControllerStarter {
     List<PeriodicTask> controllerPeriodicTasks = setupControllerPeriodicTasks();
     LOGGER.info("Init controller periodic tasks scheduler");
     _controllerPeriodicTaskScheduler = new ControllerPeriodicTaskScheduler();
-    _controllerPeriodicTaskScheduler.init(controllerPeriodicTasks, _controllerLeadershipManager);
+    _controllerPeriodicTaskScheduler.init(controllerPeriodicTasks, _leadControllerManager);
+
+    _controllerPeriodicTaskScheduler.start();
 
     LOGGER.info("Registering rebalance segments factory");
     _helixResourceManager
@@ -327,7 +332,7 @@ public class ControllerStarter {
         bind(_controllerMetrics).to(ControllerMetrics.class);
         bind(accessControlFactory).to(AccessControlFactory.class);
         bind(metadataEventNotifierFactory).to(MetadataEventNotifierFactory.class);
-        bind(_controllerLeadershipManager).to(ControllerLeadershipManager.class);
+        bind(_leadControllerManager).to(LeadControllerManager.class);
       }
     });
 
@@ -433,24 +438,29 @@ public class ControllerStarter {
   protected List<PeriodicTask> setupControllerPeriodicTasks() {
     LOGGER.info("Setting up periodic tasks");
     List<PeriodicTask> periodicTasks = new ArrayList<>();
-    _taskManager = new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, _config, _controllerMetrics);
+    _taskManager =
+        new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, _leadControllerManager, _config,
+            _controllerMetrics);
     periodicTasks.add(_taskManager);
-    _retentionManager = new RetentionManager(_helixResourceManager, _config, _controllerMetrics);
+    _retentionManager =
+        new RetentionManager(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
     periodicTasks.add(_retentionManager);
     _offlineSegmentIntervalChecker =
-        new OfflineSegmentIntervalChecker(_config, _helixResourceManager, new ValidationMetrics(_metricsRegistry),
-            _controllerMetrics);
+        new OfflineSegmentIntervalChecker(_config, _helixResourceManager, _leadControllerManager,
+            new ValidationMetrics(_metricsRegistry), _controllerMetrics);
     periodicTasks.add(_offlineSegmentIntervalChecker);
     _realtimeSegmentValidationManager =
-        new RealtimeSegmentValidationManager(_config, _helixResourceManager, _pinotLLCRealtimeSegmentManager,
-            new ValidationMetrics(_metricsRegistry), _controllerMetrics);
+        new RealtimeSegmentValidationManager(_config, _helixResourceManager, _leadControllerManager,
+            _pinotLLCRealtimeSegmentManager, new ValidationMetrics(_metricsRegistry), _controllerMetrics);
     periodicTasks.add(_realtimeSegmentValidationManager);
     _brokerResourceValidationManager =
-        new BrokerResourceValidationManager(_config, _helixResourceManager, _controllerMetrics);
+        new BrokerResourceValidationManager(_config, _helixResourceManager, _leadControllerManager, _controllerMetrics);
     periodicTasks.add(_brokerResourceValidationManager);
-    _segmentStatusChecker = new SegmentStatusChecker(_helixResourceManager, _config, _controllerMetrics);
+    _segmentStatusChecker =
+        new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
     periodicTasks.add(_segmentStatusChecker);
-    _realtimeSegmentRelocator = new RealtimeSegmentRelocator(_helixResourceManager, _config, _controllerMetrics);
+    _realtimeSegmentRelocator =
+        new RealtimeSegmentRelocator(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
     periodicTasks.add(_realtimeSegmentRelocator);
 
     return periodicTasks;
@@ -482,6 +492,10 @@ public class ControllerStarter {
       LOGGER.info("Stopping controller leadership manager");
       _controllerLeadershipManager.stop();
 
+      // Stop controller periodic task.
+      LOGGER.info("Stopping controller periodic tasks");
+      _controllerPeriodicTaskScheduler.stop();
+
       // Stop PinotLLCSegmentManager before stopping Jersey API. It is possible that stopping Jersey API
       // may interrupt the handlers waiting on an I/O.
       _pinotLLCRealtimeSegmentManager.stop();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java
new file mode 100644
index 0000000..9e4d3fb
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.config.TableNameBuilder;
+import org.apache.pinot.controller.helix.core.statemodel.LeadControllerChecker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pinot.common.utils.CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE;
+
+
+public class LeadControllerManager {
+  private static final Logger LOGGER = LoggerFactory.getLogger(LeadControllerManager.class);
+
+  private Map<String, LeadershipChangeSubscriber> _subscribers = new HashMap<>();
+
+  private final LeadControllerChecker _leadControllerChecker;
+  private ControllerLeadershipManager _controllerLeadershipManager;
+
+  public LeadControllerManager() {
+    _leadControllerChecker = new LeadControllerChecker();
+  }
+
+  public void registerControllerLeadershipManager(ControllerLeadershipManager controllerLeadershipManager) {
+    _controllerLeadershipManager = controllerLeadershipManager;
+  }
+
+  public boolean isLeaderForTable(String tableName) {
+    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+    int partitionIndex = rawTableName.hashCode() % NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE;
+    if (_leadControllerChecker.isPartitionLeader(partitionIndex)) {
+      return true;
+    } else if (_controllerLeadershipManager != null) {
+      return _controllerLeadershipManager.isLeader();
+    } else {
+      return false;
+    }
+  }
+
+  public synchronized void addPartitionLeader(String partitionName) {
+    _leadControllerChecker.addPartitionLeader(partitionName);
+  }
+
+  public synchronized void removePartitionLeader(String partitionName) {
+    _leadControllerChecker.removePartitionLeader(partitionName);
+  }
+
+  public void subscribe(String className, LeadershipChangeSubscriber subscriber) {
+    LOGGER.info("{} subscribing to leadership changes", className);
+    _subscribers.put(className, subscriber);
+//      subscriber.onBecomingLeader();
+  }
+
+  public void onLeadControllerChange() {
+
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
index 5259d9f..bd4b8a0 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
@@ -57,7 +57,6 @@ import javax.ws.rs.core.Response;
 import org.apache.commons.httpclient.HttpConnectionManager;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
-import org.apache.helix.ZNRecord;
 import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.metrics.ControllerMeter;
@@ -70,7 +69,7 @@ import org.apache.pinot.common.utils.JsonUtils;
 import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.api.access.AccessControl;
 import org.apache.pinot.controller.api.access.AccessControlFactory;
 import org.apache.pinot.controller.api.upload.SegmentValidator;
@@ -117,7 +116,7 @@ public class PinotSegmentUploadRestletResource {
   AccessControlFactory _accessControlFactory;
 
   @Inject
-  ControllerLeadershipManager _controllerLeadershipManager;
+  LeadControllerManager _leadControllerManager;
 
   @GET
   @Produces(MediaType.APPLICATION_JSON)
@@ -325,7 +324,7 @@ public class PinotSegmentUploadRestletResource {
       // Validate segment
       SegmentValidatorResponse segmentValidatorResponse =
           new SegmentValidator(_pinotHelixResourceManager, _controllerConf, _executor, _connectionManager,
-              _controllerMetrics, _controllerLeadershipManager)
+              _controllerMetrics, _leadControllerManager)
               .validateSegment(rawTableName, segmentMetadata, tempSegmentDir);
 
       // Zk operations
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
index 39a9657..42c7cf8 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
@@ -34,7 +34,7 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.segment.SegmentMetadata;
 import org.apache.pinot.common.utils.time.TimeUtils;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.api.resources.ControllerApplicationException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.util.TableSizeReader;
@@ -55,17 +55,17 @@ public class SegmentValidator {
   private final Executor _executor;
   private final HttpConnectionManager _connectionManager;
   private final ControllerMetrics _controllerMetrics;
-  private final ControllerLeadershipManager _controllerLeadershipManager;
+  private final LeadControllerManager _leadControllerManager;
 
   public SegmentValidator(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf controllerConf,
       Executor executor, HttpConnectionManager connectionManager, ControllerMetrics controllerMetrics,
-      ControllerLeadershipManager controllerLeadershipManager) {
+      LeadControllerManager leadControllerManager) {
     _pinotHelixResourceManager = pinotHelixResourceManager;
     _controllerConf = controllerConf;
     _executor = executor;
     _connectionManager = connectionManager;
     _controllerMetrics = controllerMetrics;
-    _controllerLeadershipManager = controllerLeadershipManager;
+    _leadControllerManager = leadControllerManager;
   }
 
   public SegmentValidatorResponse validateSegment(String rawTableName, SegmentMetadata segmentMetadata,
@@ -135,7 +135,7 @@ public class SegmentValidator {
         new TableSizeReader(_executor, _connectionManager, _controllerMetrics, _pinotHelixResourceManager);
     StorageQuotaChecker quotaChecker =
         new StorageQuotaChecker(offlineTableConfig, tableSizeReader, _controllerMetrics, _pinotHelixResourceManager,
-            _controllerLeadershipManager);
+            _leadControllerManager);
     return quotaChecker.isSegmentStorageWithinQuota(segmentFile, metadata.getName(),
         _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index f40e2b3..9261420 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -29,6 +29,7 @@ import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import org.slf4j.Logger;
@@ -56,10 +57,11 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
    * @param pinotHelixResourceManager The resource checker used to interact with Helix
    * @param config The controller configuration object
    */
-  public SegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
-      ControllerMetrics controllerMetrics) {
+  public SegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics) {
     super("SegmentStatusChecker", config.getStatusCheckerFrequencyInSeconds(),
-        config.getStatusCheckerInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
+        config.getStatusCheckerInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager,
+        controllerMetrics);
 
     _waitForPushTimeSeconds = config.getStatusCheckerWaitForPushTimeInSeconds();
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index a4d4d0b..e33ee62 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -53,7 +53,6 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.examples.MasterSlaveStateModelFactory;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
 import org.apache.helix.model.CurrentState;
@@ -99,6 +98,7 @@ import org.apache.pinot.common.utils.helix.PinotHelixPropertyStoreZnRecordProvid
 import org.apache.pinot.common.utils.retry.RetryPolicies;
 import org.apache.pinot.common.utils.retry.RetryPolicy;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.api.pojos.Instance;
 import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategy;
@@ -106,6 +106,7 @@ import org.apache.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategy
 import org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategy;
 import org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategyEnum;
 import org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategyFactory;
+import org.apache.pinot.controller.helix.core.statemodel.LeadControllerResourceMasterSlaveStateModelFactory;
 import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
 import org.apache.pinot.controller.helix.starter.HelixConfig;
 import org.apache.pinot.core.realtime.stream.StreamConfig;
@@ -144,6 +145,7 @@ public class PinotHelixResourceManager {
   private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
   private RebalanceSegmentStrategyFactory _rebalanceSegmentStrategyFactory;
   private TableRebalancer _tableRebalancer;
+  private LeadControllerManager _leadControllerManager;
 
   public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String helixClusterName,
       @Nonnull String controllerInstanceId, String dataDir, long externalViewOnlineToOfflineTimeoutMillis,
@@ -170,6 +172,7 @@ public class PinotHelixResourceManager {
    * Create Helix cluster if needed, and then start a Pinot controller instance.
    */
   public synchronized void start() {
+    _leadControllerManager = new LeadControllerManager();
     _helixZkManager = registerAndConnectAsHelixParticipant();
     _helixAdmin = _helixZkManager.getClusterManagmentTool();
     _propertyStore = _helixZkManager.getHelixPropertyStore();
@@ -255,6 +258,16 @@ public class PinotHelixResourceManager {
     return _propertyStore;
   }
 
+
+  /**
+   * Get lead controller manager.
+   *
+   * @return lead controller manager
+   */
+  public LeadControllerManager getLeadControllerManager() {
+    return _leadControllerManager;
+  }
+
   /**
    * Register and connect to Helix cluster as PARTICIPANT role.
    */
@@ -263,8 +276,8 @@ public class PinotHelixResourceManager {
         HelixManagerFactory.getZKHelixManager(_helixClusterName, _instanceId, InstanceType.PARTICIPANT, _helixZkURL);
 
     // Registers Master-Slave state model to state machine engine, which is for calculating participant assignment in lead controller resource.
-    helixManager.getStateMachineEngine()
-        .registerStateModelFactory(MasterSlaveSMD.name, new MasterSlaveStateModelFactory());
+    helixManager.getStateMachineEngine().registerStateModelFactory(MasterSlaveSMD.name,
+        new LeadControllerResourceMasterSlaveStateModelFactory(_leadControllerManager));
 
     try {
       helixManager.connect();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 7a09c00..649b966 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -29,6 +29,7 @@ import org.apache.pinot.common.config.TableTaskConfig;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
 import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegistry;
@@ -51,10 +52,11 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
   private final TaskGeneratorRegistry _taskGeneratorRegistry;
 
   public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
-      PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
-      ControllerMetrics controllerMetrics) {
+      PinotHelixResourceManager helixResourceManager, LeadControllerManager leadControllerManager,
+      ControllerConf controllerConf, ControllerMetrics controllerMetrics) {
     super("PinotTaskManager", controllerConf.getTaskManagerFrequencyInSeconds(),
-        controllerConf.getPinotTaskManagerInitialDelaySeconds(), helixResourceManager, controllerMetrics);
+        controllerConf.getPinotTaskManagerInitialDelaySeconds(), helixResourceManager, leadControllerManager,
+        controllerMetrics);
     _helixTaskResourceManager = helixTaskResourceManager;
     _clusterInfoProvider = new ClusterInfoProvider(helixResourceManager, helixTaskResourceManager, controllerConf);
     _taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoProvider);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 7e764f3..76d106f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -23,6 +23,7 @@ import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.core.periodictask.BasePeriodicTask;
 import org.slf4j.Logger;
@@ -40,12 +41,15 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
   private static final Logger LOGGER = LoggerFactory.getLogger(ControllerPeriodicTask.class);
 
   protected final PinotHelixResourceManager _pinotHelixResourceManager;
+  protected final LeadControllerManager _leadControllerManager;
   protected final ControllerMetrics _controllerMetrics;
 
   public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds, long initialDelayInSeconds,
-      PinotHelixResourceManager pinotHelixResourceManager, ControllerMetrics controllerMetrics) {
+      PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager,
+      ControllerMetrics controllerMetrics) {
     super(taskName, runFrequencyInSeconds, initialDelayInSeconds);
     _pinotHelixResourceManager = pinotHelixResourceManager;
+    _leadControllerManager = leadControllerManager;
     _controllerMetrics = controllerMetrics;
   }
 
@@ -75,6 +79,9 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
         LOGGER.info("Task: {} is stopped, early terminate the task", _taskName);
         break;
       }
+      if (!_leadControllerManager.isLeaderForTable(tableNameWithType)) {
+        continue;
+      }
       try {
         processTable(tableNameWithType, context);
       } catch (Exception e) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java
index 8a2b63c..858bfca 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.controller.helix.core.periodictask;
 
 import java.util.List;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.LeadershipChangeSubscriber;
 import org.apache.pinot.core.periodictask.PeriodicTask;
 import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
@@ -39,11 +39,11 @@ public class ControllerPeriodicTaskScheduler extends PeriodicTaskScheduler imple
    * Initialize the {@link ControllerPeriodicTaskScheduler} with the list of {@link ControllerPeriodicTask} created at startup
    * This is called only once during controller startup
    * @param controllerPeriodicTasks
-   * @param controllerLeadershipManager
+   * @param leadControllerManager
    */
-  public void init(List<PeriodicTask> controllerPeriodicTasks, ControllerLeadershipManager controllerLeadershipManager) {
+  public void init(List<PeriodicTask> controllerPeriodicTasks, LeadControllerManager leadControllerManager) {
     super.init(controllerPeriodicTasks);
-    controllerLeadershipManager.subscribe(ControllerPeriodicTaskScheduler.class.getName(), this);
+    leadControllerManager.subscribe(ControllerPeriodicTaskScheduler.class.getName(), this);
   }
 
   @Override
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index c833261..0982c30 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -66,7 +66,7 @@ import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.common.utils.retry.RetryPolicies;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
@@ -123,13 +123,13 @@ public class PinotLLCRealtimeSegmentManager {
   private final TableConfigCache _tableConfigCache;
   private final StreamPartitionAssignmentGenerator _streamPartitionAssignmentGenerator;
   private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
-  private final ControllerLeadershipManager _controllerLeadershipManager;
+  private final LeadControllerManager _leadControllerManager;
 
   private volatile boolean _isStopping = false;
   private AtomicInteger _numCompletingSegments = new AtomicInteger(0);
 
   public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
-      ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
+      ControllerMetrics controllerMetrics, LeadControllerManager leadControllerManager) {
     _helixAdmin = helixResourceManager.getHelixAdmin();
     _helixManager = helixResourceManager.getHelixZkManager();
     _propertyStore = helixResourceManager.getPropertyStore();
@@ -145,7 +145,7 @@ public class PinotLLCRealtimeSegmentManager {
     _tableConfigCache = new TableConfigCache(_propertyStore);
     _streamPartitionAssignmentGenerator = new StreamPartitionAssignmentGenerator(_helixManager);
     _flushThresholdUpdateManager = new FlushThresholdUpdateManager();
-    _controllerLeadershipManager = controllerLeadershipManager;
+    _leadControllerManager = leadControllerManager;
   }
 
 
@@ -182,8 +182,8 @@ public class PinotLLCRealtimeSegmentManager {
     LOGGER.info("Wait completed: Number of completing segments = {}", _numCompletingSegments.get());
   }
 
-  protected boolean isLeader() {
-    return _controllerLeadershipManager.isLeader();
+  protected boolean isLeader(String tableName) {
+    return _leadControllerManager.isLeaderForTable(tableName);
   }
 
   protected boolean isConnected() {
@@ -341,10 +341,10 @@ public class PinotLLCRealtimeSegmentManager {
     URI uriToMoveTo = ControllerConf.getUriFromPath(StringUtil.join("/", tableDirURI.toString(), segmentName));
     PinotFS pinotFS = PinotFSFactory.create(baseDirURI.getScheme());
 
-    if (!isConnected() || !isLeader()) {
+    if (!isConnected() || !isLeader(tableName)) {
       // We can potentially log a different value than what we saw ....
       LOGGER.warn("Lost leadership while committing segment file {}, {} for table {}: isLeader={}, isConnected={}",
-          segmentName, segmentLocation, tableName, isLeader(), isConnected());
+          segmentName, segmentLocation, tableName, isLeader(tableName), isConnected());
       _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
       return false;
     }
@@ -540,17 +540,17 @@ public class PinotLLCRealtimeSegmentManager {
     final String oldZnodePath =
         ZKMetadataProvider.constructPropertyStorePathForSegment(realtimeTableName, committingSegmentNameStr);
 
-    if (!isConnected() || !isLeader()) {
+    if (!isConnected() || !isLeader(realtimeTableName)) {
       // We can potentially log a different value than what we saw ....
       LOGGER.warn("Lost leadership while committing segment metadata for {} for table {}: isLeader={}, isConnected={}",
-          committingSegmentNameStr, realtimeTableName, isLeader(), isConnected());
+          committingSegmentNameStr, realtimeTableName, isLeader(realtimeTableName), isConnected());
       _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
       return false;
     }
     boolean success = writeSegmentToPropertyStore(oldZnodePath, oldZnRecord, realtimeTableName, stat.getVersion());
     if (!success) {
       LOGGER.warn("Fail to write old segment to property store for {} for table {}: isLeader={}, isConnected={}",
-          committingSegmentNameStr, realtimeTableName, isLeader(), isConnected());
+          committingSegmentNameStr, realtimeTableName, isLeader(realtimeTableName), isConnected());
     }
     return success;
   }
@@ -599,11 +599,11 @@ public class PinotLLCRealtimeSegmentManager {
         ZKMetadataProvider.constructPropertyStorePathForSegment(realtimeTableName, newSegmentNameStr);
 
     if (!isNewTableSetup) {
-      if (!isLeader() || !isConnected()) {
+      if (!isLeader(realtimeTableName) || !isConnected()) {
         // We can potentially log a different value than what we saw ....
         LOGGER.warn(
             "Lost leadership while committing new segment metadata for {} for table {}: isLeader={}, isConnected={}",
-            newSegmentNameStr, rawTableName, isLeader(), isConnected());
+            newSegmentNameStr, rawTableName, isLeader(realtimeTableName), isConnected());
         _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
         return false;
       }
@@ -612,7 +612,7 @@ public class PinotLLCRealtimeSegmentManager {
     boolean success = writeSegmentToPropertyStore(newZnodePath, newZnRecord, realtimeTableName);
     if (!success) {
       LOGGER.warn("Fail to write new segment to property store for {} for table {}: isLeader={}, isConnected={}",
-          newSegmentNameStr, rawTableName, isLeader(), isConnected());
+          newSegmentNameStr, rawTableName, isLeader(realtimeTableName), isConnected());
     }
     return success;
   }
@@ -1347,8 +1347,4 @@ public class PinotLLCRealtimeSegmentManager {
 
     return idealState;
   }
-
-  public ControllerLeadershipManager getControllerLeadershipManager() {
-    return _controllerLeadershipManager;
-  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index a4da6b4..8935c6b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.controller.helix.core.realtime;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -29,7 +28,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.helix.HelixManager;
-import org.apache.helix.ZNRecord;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerMeter;
@@ -37,12 +35,13 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.common.utils.SegmentName.SEPARATOR;
+
 
 /**
  * This is a singleton class in the controller that drives the state machines for segments that are in the
@@ -73,7 +72,7 @@ public class SegmentCompletionManager {
   private final Map<String, Long> _commitTimeMap = new ConcurrentHashMap<>();
   private final PinotLLCRealtimeSegmentManager _segmentManager;
   private final ControllerMetrics _controllerMetrics;
-  private final ControllerLeadershipManager _controllerLeadershipManager;
+  private final LeadControllerManager _leadControllerManager;
   private final Lock[] _fsmLocks;
   private static final int NUM_FSM_LOCKS = 20;
 
@@ -87,12 +86,12 @@ public class SegmentCompletionManager {
   // TODO keep some history of past committed segments so that we can avoid looking up PROPERTYSTORE if some server comes in late.
 
   public SegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager,
-      ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager,
+      ControllerMetrics controllerMetrics, LeadControllerManager leadControllerManager,
       int segmentCommitTimeoutSeconds) {
     _helixManager = helixManager;
     _segmentManager = segmentManager;
     _controllerMetrics = controllerMetrics;
-    _controllerLeadershipManager = controllerLeadershipManager;
+    _leadControllerManager = leadControllerManager;
     SegmentCompletionProtocol
         .setMaxSegmentCommitTimeMs(TimeUnit.MILLISECONDS.convert(segmentCommitTimeoutSeconds, TimeUnit.SECONDS));
     _fsmLocks = new Lock[NUM_FSM_LOCKS];
@@ -163,11 +162,12 @@ public class SegmentCompletionManager {
    * that it currently has (i.e. next offset that it will consume, if it continues to consume).
    */
   public SegmentCompletionProtocol.Response segmentConsumed(SegmentCompletionProtocol.Request.Params reqParams) {
-    if (!isLeader() || !_helixManager.isConnected()) {
+    final String segmentNameStr = reqParams.getSegmentName();
+    final String tableName = segmentNameStr.split(SEPARATOR)[0];
+    if (!isLeader(tableName) || !_helixManager.isConnected()) {
       _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
-    final String segmentNameStr = reqParams.getSegmentName();
     final String instanceId = reqParams.getInstanceId();
     final String stopReason = reqParams.getReason();
     final long offset = reqParams.getOffset();
@@ -201,11 +201,12 @@ public class SegmentCompletionManager {
    */
   public SegmentCompletionProtocol.Response segmentCommitStart(
       final SegmentCompletionProtocol.Request.Params reqParams) {
-    if (!isLeader() || !_helixManager.isConnected()) {
+    final String segmentNameStr = reqParams.getSegmentName();
+    final String tableName = segmentNameStr.split(SEPARATOR)[0];
+    if (!isLeader(tableName) || !_helixManager.isConnected()) {
       _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
-    final String segmentNameStr = reqParams.getSegmentName();
     final String instanceId = reqParams.getInstanceId();
     final long offset = reqParams.getOffset();
     LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
@@ -225,11 +226,12 @@ public class SegmentCompletionManager {
   }
 
   public SegmentCompletionProtocol.Response extendBuildTime(final SegmentCompletionProtocol.Request.Params reqParams) {
-    if (!isLeader() || !_helixManager.isConnected()) {
+    final String segmentNameStr = reqParams.getSegmentName();
+    final String tableName = segmentNameStr.split(SEPARATOR)[0];
+    if (!isLeader(tableName) || !_helixManager.isConnected()) {
       _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
-    final String segmentNameStr = reqParams.getSegmentName();
     final String instanceId = reqParams.getInstanceId();
     final long offset = reqParams.getOffset();
     final int extTimeSec = reqParams.getExtraTimeSec();
@@ -256,11 +258,12 @@ public class SegmentCompletionManager {
    */
   public SegmentCompletionProtocol.Response segmentStoppedConsuming(
       SegmentCompletionProtocol.Request.Params reqParams) {
-    if (!isLeader() || !_helixManager.isConnected()) {
+    final String segmentNameStr = reqParams.getSegmentName();
+    final String tableName = segmentNameStr.split(SEPARATOR)[0];
+    if (!isLeader(tableName) || !_helixManager.isConnected()) {
       _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
-    final String segmentNameStr = reqParams.getSegmentName();
     final String instanceId = reqParams.getInstanceId();
     final long offset = reqParams.getOffset();
     final String reason = reqParams.getReason();
@@ -292,11 +295,12 @@ public class SegmentCompletionManager {
    */
   public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams,
       boolean success, boolean isSplitCommit, CommittingSegmentDescriptor committingSegmentDescriptor) {
-    if (!isLeader() || !_helixManager.isConnected()) {
+    final String segmentNameStr = reqParams.getSegmentName();
+    final String tableName = segmentNameStr.split(SEPARATOR)[0];
+    if (!isLeader(tableName) || !_helixManager.isConnected()) {
       _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
-    final String segmentNameStr = reqParams.getSegmentName();
     LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
     SegmentCompletionFSM fsm = null;
     SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.RESP_FAILED;
@@ -352,6 +356,7 @@ public class SegmentCompletionManager {
     State _state = State.HOLDING;   // Typically start off in HOLDING state.
     final long _startTimeMs;
     private final LLCSegmentName _segmentName;
+    private final String _realtimeTableName;
     private final int _numReplicas;
     private final Set<String> _excludedServerStateMap;
     private final Map<String, Long> _commitStateMap;
@@ -394,6 +399,7 @@ public class SegmentCompletionManager {
     private SegmentCompletionFSM(PinotLLCRealtimeSegmentManager segmentManager,
         SegmentCompletionManager segmentCompletionManager, LLCSegmentName segmentName, int numReplicas) {
       _segmentName = segmentName;
+      _realtimeTableName = _segmentName.getTableName();
       _numReplicas = numReplicas;
       _segmentManager = segmentManager;
       _commitStateMap = new HashMap<>(_numReplicas);
@@ -403,8 +409,8 @@ public class SegmentCompletionManager {
       _maxTimeToPickWinnerMs = _startTimeMs + MAX_TIME_TO_PICK_WINNER_MS;
       _maxTimeToNotifyWinnerMs = _startTimeMs + MAX_TIME_TO_NOTIFY_WINNER_MS;
       long initialCommitTimeMs =
-          MAX_TIME_TO_NOTIFY_WINNER_MS + _segmentManager.getCommitTimeoutMS(_segmentName.getTableName());
-      Long savedCommitTime = _segmentCompletionManager._commitTimeMap.get(segmentName.getTableName());
+          MAX_TIME_TO_NOTIFY_WINNER_MS + _segmentManager.getCommitTimeoutMS(_realtimeTableName);
+      Long savedCommitTime = _segmentCompletionManager._commitTimeMap.get(_realtimeTableName);
       if (savedCommitTime != null && savedCommitTime > initialCommitTimeMs) {
         initialCommitTimeMs = savedCommitTime;
       }
@@ -413,7 +419,7 @@ public class SegmentCompletionManager {
         // The table has a really high value configured for max commit time. Set it to a higher value than default
         // and go from there.
         LOGGER.info("Configured max commit time {}s too high for table {}, changing to {}s", initialCommitTimeMs / 1000,
-            segmentName.getTableName(), MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS);
+            _realtimeTableName, MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS);
         initialCommitTimeMs = MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS * 1000;
       }
       _initialCommitTimeMs = initialCommitTimeMs;
@@ -681,14 +687,14 @@ public class SegmentCompletionManager {
     private SegmentCompletionProtocol.Response abortAndReturnHold(long now, String instanceId, long offset) {
       _state = State.ABORTED;
       _segmentCompletionManager._controllerMetrics
-          .addMeteredTableValue(_segmentName.getTableName(), ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
+          .addMeteredTableValue(_realtimeTableName, ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
       return hold(instanceId, offset);
     }
 
     private SegmentCompletionProtocol.Response abortAndReturnFailed() {
       _state = State.ABORTED;
       _segmentCompletionManager._controllerMetrics
-          .addMeteredTableValue(_segmentName.getTableName(), ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
+          .addMeteredTableValue(_realtimeTableName, ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
       return SegmentCompletionProtocol.RESP_FAILED;
     }
 
@@ -1117,7 +1123,7 @@ public class SegmentCompletionManager {
   }
 
   @VisibleForTesting
-  protected boolean isLeader() {
-    return _controllerLeadershipManager.isLeader();
+  protected boolean isLeader(String tableName) {
+    return _leadControllerManager.isLeaderForTable(tableName);
   }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index ea5b05b..2c2c017 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -38,6 +38,7 @@ import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.common.utils.retry.RetryPolicies;
 import org.apache.pinot.common.utils.time.TimeUtils;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
 import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
@@ -55,10 +56,11 @@ import org.slf4j.LoggerFactory;
 public class RealtimeSegmentRelocator extends ControllerPeriodicTask<Void> {
   private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeSegmentRelocator.class);
 
-  public RealtimeSegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
-      ControllerMetrics controllerMetrics) {
+  public RealtimeSegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics) {
     super("RealtimeSegmentRelocator", getRunFrequencySeconds(config.getRealtimeSegmentRelocatorFrequency()),
-        config.getRealtimeSegmentRelocationInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
+        config.getRealtimeSegmentRelocationInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager,
+        controllerMetrics);
   }
 
   @Override
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 7d5182d..fecb4ac 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -35,6 +35,7 @@ import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
 import org.apache.pinot.common.utils.SegmentName;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import org.apache.pinot.controller.helix.core.retention.strategy.RetentionStrategy;
@@ -54,10 +55,11 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
 
   private final int _deletedSegmentsRetentionInDays;
 
-  public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
-      ControllerMetrics controllerMetrics) {
+  public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics) {
     super("RetentionManager", config.getRetentionControllerFrequencyInSeconds(),
-        config.getRetentionManagerInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
+        config.getRetentionManagerInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager,
+        controllerMetrics);
     _deletedSegmentsRetentionInDays = config.getDeletedSegmentsRetentionInDays();
 
     LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}, deletedSegmentsRetentionInDays: {}",
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerChecker.java
new file mode 100644
index 0000000..69b56a3
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerChecker.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.statemodel;
+
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pinot.common.utils.CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE;
+
+
+public class LeadControllerChecker {
+  private static final Logger LOGGER = LoggerFactory.getLogger(LeadControllerChecker.class);
+
+  private Map<Integer, String> _partitionCache;
+
+  public LeadControllerChecker() {
+    _partitionCache = new ConcurrentHashMap<>();
+  }
+
+  public void addPartitionLeader(String partitionName) {
+    LOGGER.info("Add Partition: {} to LeadControllerChecker", partitionName);
+    int partitionIndex = Integer.parseInt(partitionName.substring(partitionName.lastIndexOf("_") + 1));
+    _partitionCache.putIfAbsent(partitionIndex, null);
+  }
+
+  public void removePartitionLeader(String partitionName) {
+    LOGGER.info("Remove Partition: {} from LeadControllerChecker", partitionName);
+    int partitionIndex = Integer.parseInt(partitionName.substring(partitionName.lastIndexOf("_") + 1));
+    _partitionCache.remove(partitionIndex);
+  }
+
+  public boolean isPartitionLeader(int partitionIndex) {
+    Preconditions.checkArgument(partitionIndex >= 0 && partitionIndex < NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE, "Invalid partition index: " + partitionIndex);
+    return _partitionCache.containsKey(partitionIndex);
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerResourceMasterSlaveStateModelFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerResourceMasterSlaveStateModelFactory.java
new file mode 100644
index 0000000..d063c58
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerResourceMasterSlaveStateModelFactory.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.statemodel;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class LeadControllerResourceMasterSlaveStateModelFactory extends MasterSlaveStateModelFactory {
+  private static final Logger LOGGER = LoggerFactory.getLogger(LeadControllerResourceMasterSlaveStateModelFactory.class);
+
+  private final LeadControllerManager _leadControllerManager;
+
+  public LeadControllerResourceMasterSlaveStateModelFactory(LeadControllerManager leadControllerManager) {
+    super();
+    _leadControllerManager = leadControllerManager;
+  }
+
+  @Override
+  public StateModel createNewStateModel(String resourceName, String partitionName) {
+    MasterSlaveStateModel stateModel = new LeadControllerResourceMasterSlaveStateModel();
+    stateModel.setPartitionName(partitionName);
+    return stateModel;
+  }
+
+  public class LeadControllerResourceMasterSlaveStateModel extends MasterSlaveStateModel {
+    @Override
+    public void onBecomeSlaveFromMaster(Message message, NotificationContext context) {
+      super.onBecomeSlaveFromMaster(message, context);
+      String partitionName = message.getPartitionName();
+      _leadControllerManager.addPartitionLeader(partitionName);
+      _leadControllerManager.onLeadControllerChange();
+    }
+
+    @Override
+    public void onBecomeMasterFromSlave(Message message, NotificationContext context) {
+      super.onBecomeMasterFromSlave(message, context);
+      String partitionName = message.getPartitionName();
+      _leadControllerManager.removePartitionLeader(partitionName);
+      _leadControllerManager.onLeadControllerChange();
+    }
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
index 5748d3c..a82a161 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
@@ -24,6 +24,7 @@ import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import org.slf4j.Logger;
@@ -37,9 +38,9 @@ public class BrokerResourceValidationManager extends ControllerPeriodicTask<Brok
   private static final Logger LOGGER = LoggerFactory.getLogger(BrokerResourceValidationManager.class);
 
   public BrokerResourceValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
-      ControllerMetrics controllerMetrics) {
+      LeadControllerManager leadControllerManager, ControllerMetrics controllerMetrics) {
     super("BrokerResourceValidationManager", config.getBrokerResourceValidationFrequencyInSeconds(),
-        config.getBrokerResourceValidationInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
+        config.getBrokerResourceValidationInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager, controllerMetrics);
   }
 
   @Override
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index 7f19395..0eeadc7 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -30,6 +30,7 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.metrics.ValidationMetrics;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import org.apache.pinot.controller.util.SegmentIntervalUtils;
@@ -50,9 +51,11 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask<Void>
   private final ValidationMetrics _validationMetrics;
 
   public OfflineSegmentIntervalChecker(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
-      ValidationMetrics validationMetrics, ControllerMetrics controllerMetrics) {
+      LeadControllerManager leadControllerManager, ValidationMetrics validationMetrics,
+      ControllerMetrics controllerMetrics) {
     super("OfflineSegmentIntervalChecker", config.getOfflineSegmentIntervalCheckerFrequencyInSeconds(),
-        config.getOfflineSegmentIntervalCheckerInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
+        config.getOfflineSegmentIntervalCheckerInitialDelayInSeconds(), pinotHelixResourceManager,
+        leadControllerManager, controllerMetrics);
     _validationMetrics = validationMetrics;
   }
 
@@ -88,12 +91,12 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask<Void>
         if (SegmentIntervalUtils.isValidInterval(timeInterval)) {
           segmentIntervals.add(timeInterval);
         } else {
-          numSegmentsWithInvalidIntervals ++;
+          numSegmentsWithInvalidIntervals++;
         }
       }
       if (numSegmentsWithInvalidIntervals > 0) {
-        LOGGER.warn("Table: {} has {} segments with invalid interval", offlineTableName,
-            numSegmentsWithInvalidIntervals);
+        LOGGER
+            .warn("Table: {} has {} segments with invalid interval", offlineTableName, numSegmentsWithInvalidIntervals);
       }
       Duration frequency = SegmentIntervalUtils.convertToDuration(validationConfig.getSegmentPushFrequency());
       numMissingSegments = computeNumMissingSegments(segmentIntervals, frequency);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 5eb5a6f..88ad642 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -32,6 +32,7 @@ import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.HLCSegmentName;
 import org.apache.pinot.common.utils.SegmentName;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
@@ -53,10 +54,11 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea
   private long _lastUpdateRealtimeDocumentCountTimeMs = 0L;
 
   public RealtimeSegmentValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
-      PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, ValidationMetrics validationMetrics,
-      ControllerMetrics controllerMetrics) {
+      LeadControllerManager leadControllerManager, PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
+      ValidationMetrics validationMetrics, ControllerMetrics controllerMetrics) {
     super("RealtimeSegmentValidationManager", config.getRealtimeSegmentValidationFrequencyInSeconds(),
-        config.getRealtimeSegmentValidationManagerInitialDelaySeconds(), pinotHelixResourceManager, controllerMetrics);
+        config.getRealtimeSegmentValidationManagerInitialDelaySeconds(), pinotHelixResourceManager,
+        leadControllerManager, controllerMetrics);
     _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
     _validationMetrics = validationMetrics;
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
index db23301..c78c756 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
@@ -30,7 +30,7 @@ import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.utils.DataSize;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.util.TableSizeReader;
 import org.slf4j.Logger;
@@ -48,16 +48,16 @@ public class StorageQuotaChecker {
   private final TableConfig _tableConfig;
   private final ControllerMetrics _controllerMetrics;
   private final PinotHelixResourceManager _pinotHelixResourceManager;
-  private final ControllerLeadershipManager _controllerLeadershipManager;
+  private final LeadControllerManager _leadControllerManager;
 
   public StorageQuotaChecker(TableConfig tableConfig, TableSizeReader tableSizeReader,
       ControllerMetrics controllerMetrics, PinotHelixResourceManager pinotHelixResourceManager,
-      ControllerLeadershipManager controllerLeadershipManager) {
+      LeadControllerManager leadControllerManager) {
     _tableConfig = tableConfig;
     _tableSizeReader = tableSizeReader;
     _controllerMetrics = controllerMetrics;
     _pinotHelixResourceManager = pinotHelixResourceManager;
-    _controllerLeadershipManager = controllerLeadershipManager;
+    _leadControllerManager = leadControllerManager;
   }
 
   public static class QuotaCheckerResponse {
@@ -157,7 +157,7 @@ public class StorageQuotaChecker {
         tableNameWithType, tableSubtypeSize.estimatedSizeInBytes, tableSubtypeSize.reportedSizeInBytes);
 
     // Only emit the real percentage of storage quota usage by lead controller, otherwise emit 0L.
-    if (isLeader() && allowedStorageBytes != 0L) {
+    if (isLeader(tableNameWithType) && allowedStorageBytes != 0L) {
       long existingStorageQuotaUtilization = tableSubtypeSize.estimatedSizeInBytes * 100 / allowedStorageBytes;
       _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_STORAGE_QUOTA_UTILIZATION,
           existingStorageQuotaUtilization);
@@ -213,7 +213,7 @@ public class StorageQuotaChecker {
     }
   }
 
-  protected boolean isLeader() {
-    return _controllerLeadershipManager.isLeader();
+  protected boolean isLeader(String tableName) {
+    return _leadControllerManager.isLeaderForTable(tableName);
   }
 }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
index c235483..066040e 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -35,10 +35,12 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -46,6 +48,7 @@ import static org.mockito.Mockito.when;
 public class SegmentStatusCheckerTest {
   private SegmentStatusChecker segmentStatusChecker;
   private PinotHelixResourceManager helixResourceManager;
+  private LeadControllerManager leadControllerManager;
   private MetricsRegistry metricsRegistry;
   private ControllerMetrics controllerMetrics;
   private ControllerConf config;
@@ -84,9 +87,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(
@@ -146,9 +154,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(
@@ -222,9 +235,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(0);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(
@@ -264,9 +282,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
@@ -291,9 +314,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE),
@@ -349,9 +377,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(
@@ -390,9 +423,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
@@ -429,9 +467,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
     // verify state before test
     Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT), 0);
     // update metrics
@@ -463,9 +506,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
     // verify state before test
     Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT), 0);
     // update metrics
@@ -508,9 +556,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE),
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index a928922..dcd5469 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -27,10 +27,12 @@ import java.util.stream.IntStream;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
@@ -43,6 +45,7 @@ public class ControllerPeriodicTaskTest {
   private final ControllerConf _controllerConf = new ControllerConf();
 
   private final PinotHelixResourceManager _resourceManager = mock(PinotHelixResourceManager.class);
+  private final LeadControllerManager _leadControllerManager = mock(LeadControllerManager.class);
   private final ControllerMetrics _controllerMetrics = new ControllerMetrics(new MetricsRegistry());
   private final AtomicBoolean _startTaskCalled = new AtomicBoolean();
   private final AtomicBoolean _stopTaskCalled = new AtomicBoolean();
@@ -52,7 +55,8 @@ public class ControllerPeriodicTaskTest {
   private static final String TASK_NAME = "TestTask";
 
   private final ControllerPeriodicTask _task = new ControllerPeriodicTask<Void>(TASK_NAME, RUN_FREQUENCY_IN_SECONDS,
-      _controllerConf.getPeriodicTaskInitialDelayInSeconds(), _resourceManager, _controllerMetrics) {
+      _controllerConf.getPeriodicTaskInitialDelayInSeconds(), _resourceManager, _leadControllerManager,
+      _controllerMetrics) {
 
     @Override
     protected void setUpTask() {
@@ -81,6 +85,7 @@ public class ControllerPeriodicTaskTest {
     List<String> tables = new ArrayList<>(_numTables);
     IntStream.range(0, _numTables).forEach(i -> tables.add("table_" + i + " _OFFLINE"));
     when(_resourceManager.getAllTables()).thenReturn(tables);
+    when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
   }
 
   private void resetState() {
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 04e2b7b..af35420 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -56,7 +56,7 @@ import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.api.resources.LLCSegmentCompletionHandlers;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
@@ -66,7 +66,6 @@ import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.core.realtime.stream.OffsetCriteria;
 import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
 import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
 import org.apache.pinot.filesystem.PinotFSFactory;
 import org.apache.zookeeper.data.Stat;
@@ -1323,7 +1322,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     protected FakePinotLLCRealtimeSegmentManager(PinotHelixResourceManager pinotHelixResourceManager,
         List<String> existingLLCSegments, ControllerMetrics controllerMetrics) {
       super(pinotHelixResourceManager, CONTROLLER_CONF, controllerMetrics,
-          new ControllerLeadershipManager(pinotHelixResourceManager.getHelixZkManager(), controllerMetrics));
+          new LeadControllerManager());
 
       try {
         TableConfigCache mockCache = mock(TableConfigCache.class);
@@ -1513,7 +1512,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     }
 
     @Override
-    protected boolean isLeader() {
+    protected boolean isLeader(String tableName) {
       return IS_LEADER;
     }
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index b05f9bf..22e317f 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -30,7 +30,7 @@ import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
 import org.apache.zookeeper.data.Stat;
@@ -40,7 +40,6 @@ import org.testng.annotations.Test;
 
 import static org.apache.pinot.common.protocols.SegmentCompletionProtocol.ControllerResponseStatus;
 import static org.apache.pinot.common.protocols.SegmentCompletionProtocol.Request;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -1151,8 +1150,7 @@ public class SegmentCompletionTest {
 
     protected MockPinotLLCRealtimeSegmentManager(PinotHelixResourceManager pinotHelixResourceManager,
         ControllerMetrics controllerMetrics) {
-      super(pinotHelixResourceManager, CONTROLLER_CONF, controllerMetrics,
-          new ControllerLeadershipManager(pinotHelixResourceManager.getHelixZkManager(), controllerMetrics));
+      super(pinotHelixResourceManager, CONTROLLER_CONF, controllerMetrics, new LeadControllerManager());
     }
 
     @Override
@@ -1210,7 +1208,7 @@ public class SegmentCompletionTest {
     protected MockSegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager,
         boolean isLeader, ControllerMetrics controllerMetrics) {
       super(helixManager, segmentManager, controllerMetrics,
-          new ControllerLeadershipManager(helixManager, controllerMetrics),
+          new LeadControllerManager(),
           SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds());
       _isLeader = isLeader;
     }
@@ -1221,7 +1219,7 @@ public class SegmentCompletionTest {
     }
 
     @Override
-    protected boolean isLeader() {
+    protected boolean isLeader(String tableName) {
       return _isLeader;
     }
   }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
index 8c5c8ec..9316062 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
@@ -34,12 +34,14 @@ import org.apache.helix.model.builder.CustomModeISBuilder;
 import org.apache.pinot.common.config.RealtimeTagConfig;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -48,6 +50,7 @@ public class RealtimeSegmentRelocatorTest {
 
   private TestRealtimeSegmentRelocator _realtimeSegmentRelocator;
   private HelixManager _mockHelixManager;
+  private LeadControllerManager _leadControllerManager;
 
   private String[] serverNames;
   private String[] consumingServerNames;
@@ -69,10 +72,13 @@ public class RealtimeSegmentRelocatorTest {
     PinotHelixResourceManager mockPinotHelixResourceManager = mock(PinotHelixResourceManager.class);
     _mockHelixManager = mock(HelixManager.class);
     when(mockPinotHelixResourceManager.getHelixZkManager()).thenReturn(_mockHelixManager);
+    LeadControllerManager mockLeadControllerManager = mock(LeadControllerManager.class);
+    when(mockLeadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
     ControllerConf controllerConfig = new ControllerConf();
     ControllerMetrics controllerMetrics = new ControllerMetrics(new MetricsRegistry());
     _realtimeSegmentRelocator =
-        new TestRealtimeSegmentRelocator(mockPinotHelixResourceManager, controllerConfig, controllerMetrics);
+        new TestRealtimeSegmentRelocator(mockPinotHelixResourceManager, mockLeadControllerManager, controllerConfig,
+            controllerMetrics);
 
     final int maxInstances = 20;
     serverNames = new String[maxInstances];
@@ -268,9 +274,9 @@ public class RealtimeSegmentRelocatorTest {
 
     private Map<String, List<String>> tagToInstances;
 
-    public TestRealtimeSegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
-        ControllerMetrics controllerMetrics) {
-      super(pinotHelixResourceManager, config, controllerMetrics);
+    public TestRealtimeSegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager,
+        LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics) {
+      super(pinotHelixResourceManager, leadControllerManager, config, controllerMetrics);
       tagToInstances = new HashedMap();
     }
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index d4adfe1..465a9af 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -35,6 +35,7 @@ import org.apache.pinot.common.segment.SegmentMetadata;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
 import org.apache.pinot.controller.helix.core.SegmentDeletionManager;
@@ -84,6 +85,9 @@ public class RetentionManagerTest {
     PinotHelixResourceManager pinotHelixResourceManager = mock(PinotHelixResourceManager.class);
     setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager);
 
+    LeadControllerManager leadControllerManager = mock(LeadControllerManager.class);
+    when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+
     when(pinotHelixResourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
     when(pinotHelixResourceManager.getOfflineSegmentMetadata(OFFLINE_TABLE_NAME)).thenReturn(metadataList);
 
@@ -91,7 +95,8 @@ public class RetentionManagerTest {
     ControllerMetrics controllerMetrics = new ControllerMetrics(new MetricsRegistry());
     conf.setRetentionControllerFrequencyInSeconds(0);
     conf.setDeletedSegmentsRetentionInDays(0);
-    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, conf, controllerMetrics);
+    RetentionManager retentionManager =
+        new RetentionManager(pinotHelixResourceManager, leadControllerManager, conf, controllerMetrics);
     retentionManager.start();
     retentionManager.run();
 
@@ -210,11 +215,14 @@ public class RetentionManagerTest {
         setupSegmentMetadata(tableConfig, now, initialNumSegments, removedSegments);
     setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager);
 
+    LeadControllerManager leadControllerManager = mock(LeadControllerManager.class);
+    when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+
     ControllerConf conf = new ControllerConf();
     ControllerMetrics controllerMetrics = new ControllerMetrics(new MetricsRegistry());
     conf.setRetentionControllerFrequencyInSeconds(0);
     conf.setDeletedSegmentsRetentionInDays(0);
-    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, conf, controllerMetrics);
+    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, leadControllerManager, conf, controllerMetrics);
     retentionManager.start();
     retentionManager.run();
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
index e84a947..f8fb6ca 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
@@ -30,7 +30,7 @@ import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.util.TableSizeReader;
 import org.mockito.invocation.InvocationOnMock;
@@ -49,7 +49,7 @@ public class StorageQuotaCheckerTest {
   private TableConfig _tableConfig;
   private ControllerMetrics _controllerMetrics;
   private PinotHelixResourceManager _pinotHelixResourceManager;
-  private ControllerLeadershipManager _controllerLeadershipManager;
+  private LeadControllerManager _leadControllerManager;
   private QuotaConfig _quotaConfig;
   private SegmentsValidationAndRetentionConfig _validationConfig;
   private static final File TEST_DIR = new File(StorageQuotaCheckerTest.class.getName());
@@ -62,7 +62,7 @@ public class StorageQuotaCheckerTest {
     _controllerMetrics = new ControllerMetrics(new MetricsRegistry());
     _validationConfig = mock(SegmentsValidationAndRetentionConfig.class);
     _pinotHelixResourceManager = mock(PinotHelixResourceManager.class);
-    _controllerLeadershipManager = mock(ControllerLeadershipManager.class);
+    _leadControllerManager = mock(LeadControllerManager.class);
     when(_tableConfig.getValidationConfig()).thenReturn(_validationConfig);
     when(_validationConfig.getReplicationNumber()).thenReturn(2);
     TEST_DIR.mkdirs();
@@ -78,10 +78,9 @@ public class StorageQuotaCheckerTest {
       throws InvalidConfigException {
     StorageQuotaChecker checker =
         new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager,
-            _controllerLeadershipManager);
+            _leadControllerManager);
     when(_tableConfig.getQuotaConfig()).thenReturn(null);
-    StorageQuotaChecker.QuotaCheckerResponse res =
-        checker.isSegmentStorageWithinQuota(TEST_DIR, "segment", 1000);
+    StorageQuotaChecker.QuotaCheckerResponse res = checker.isSegmentStorageWithinQuota(TEST_DIR, "segment", 1000);
     Assert.assertTrue(res.isSegmentWithinQuota);
   }
 
@@ -90,11 +89,10 @@ public class StorageQuotaCheckerTest {
       throws InvalidConfigException {
     StorageQuotaChecker checker =
         new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager,
-            _controllerLeadershipManager);
+            _leadControllerManager);
     when(_tableConfig.getQuotaConfig()).thenReturn(_quotaConfig);
     when(_quotaConfig.storageSizeBytes()).thenReturn(-1L);
-    StorageQuotaChecker.QuotaCheckerResponse res =
-        checker.isSegmentStorageWithinQuota(TEST_DIR, "segment", 1000);
+    StorageQuotaChecker.QuotaCheckerResponse res = checker.isSegmentStorageWithinQuota(TEST_DIR, "segment", 1000);
     Assert.assertTrue(res.isSegmentWithinQuota);
   }
 
@@ -134,9 +132,8 @@ public class StorageQuotaCheckerTest {
     when(_quotaConfig.getStorage()).thenReturn("3K");
     StorageQuotaChecker checker =
         new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager,
-            _controllerLeadershipManager);
-    StorageQuotaChecker.QuotaCheckerResponse response =
-        checker.isSegmentStorageWithinQuota(TEST_DIR, "segment1", 1000);
+            _leadControllerManager);
+    StorageQuotaChecker.QuotaCheckerResponse response = checker.isSegmentStorageWithinQuota(TEST_DIR, "segment1", 1000);
     Assert.assertTrue(response.isSegmentWithinQuota);
     Assert.assertEquals(
         _controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.TABLE_STORAGE_QUOTA_UTILIZATION), 80L);
@@ -184,12 +181,12 @@ public class StorageQuotaCheckerTest {
 
     public MockStorageQuotaChecker(TableConfig tableConfig, TableSizeReader tableSizeReader,
         ControllerMetrics controllerMetrics, PinotHelixResourceManager pinotHelixResourceManager,
-        ControllerLeadershipManager controllerLeadershipManager) {
-      super(tableConfig, tableSizeReader, controllerMetrics, pinotHelixResourceManager, controllerLeadershipManager);
+        LeadControllerManager leadControllerManager) {
+      super(tableConfig, tableSizeReader, controllerMetrics, pinotHelixResourceManager, leadControllerManager);
     }
 
     @Override
-    protected boolean isLeader() {
+    protected boolean isLeader(String tableName) {
       return true;
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org