You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/06/14 23:45:57 UTC

[incubator-pinot] branch add-logic-for-lead-controller-resource updated (2bb1476 -> df041cb)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a change to branch add-logic-for-lead-controller-resource
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


 discard 2bb1476  Add logic for lead controller resource on controller side
 discard d07e0ba  Add logic for leveraging lead controller resource
     add 1257a35  Clean up table name access for StorageQuotaChecker (#4302)
     add 753801a  [TE] Dispaly alert description in alert emails (#4292)
     add c4e1ac9  [TE] Auto-Resolving Create-Update API for alerts (#4295)
     add 23cb9ed  [TE] Improve log debugging - leverage logback capabilities to log thread id; make logging class concise (#4291)
     add d4474b9  [TE] populate the feedback in DTO for child anomalies (#4301)
     add 6c4138f  [TE] Detection model maintenance flow (#4266)
     add 38a1804  [TE] Translator code refactor to accommodate entity config translation (#4286)
     add 4c3be2d  [TE] add path for create/update api (#4305)
     add baca816  Adding example of querying BYTES column (#4304)
     add 2a4b425  [TE] Clean up exceptions in Yaml Resource (#4303)
     add 9f3fe4e  PQL -> SQL enhancement - phase 1 - new Pinot Query Struct (#4216)
     add c238c2b  Added tests for RealtimeSegmentValidationManager (#4306)
     add c7e2481  [TE] frontend - harleyjj/screenshot - Adds bounds to Screenshot and Anomalies route (#4300)
     add 27ad26c  Adding script to build and publish docker image (#4200)
     add 8b45b8f  Prepare licenses for 0.2 (#4290)
     add 891b31c  [TE] frontend - harleyjj/alerts - refactor alerts controller to maintain consistent state (#4299)
     add 5390395  Adding ORC configuration in docs (#4045)
     add e397731  [TE] Propagate diagnostics and evaluations from AnoamlyFilterWrapper (#4315)
     add fc4165d  [TE] Revert HTTPS redirect (#4316)
     add 2e543e9  Add interface and implementations for the new segment assignment (#4269)
     add eee245e  [TE] Use multimap in metric urn as dimension (#4313)
     new e87ee39  Add logic for leveraging lead controller resource
     new df041cb  Add logic for lead controller resource on controller side

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (2bb1476)
            \
             N -- N -- N   refs/heads/add-logic-for-lead-controller-resource (df041cb)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 LICENSE-binary                                     |  263 +--
 NOTICE-binary                                      |  771 +++++----
 docker/Dockerfile                                  |   62 +
 docker/README.md                                   |  116 ++
 docker/docker-build-and-push.sh                    |   30 +
 docker/docker-build.sh                             |   47 +
 docker/docker-push.sh                              |   29 +
 docs/pql_examples.rst                              |   13 +-
 docs/record_reader.rst                             |   11 +
 licenses-binary/LICENSE-aopalliance.txt            |    2 +
 licenses-binary/LICENSE-asm.txt                    |   27 +
 licenses-binary/LICENSE-esotericsoftware.txt       |   11 +
 licenses-binary/LICENSE-jcraft.txt                 |   32 +
 licenses-binary/LICENSE-protobuf.txt               |   32 +
 licenses-binary/LICENSE-xmlenc.txt                 |   27 +
 pinot-common/pom.xml                               |    7 +
 .../pinot/common/metadata/ZKMetadataProvider.java  |    4 +-
 .../pinot/common/request/AggregationInfo.java      |    6 +-
 .../apache/pinot/common/request/BrokerRequest.java |  133 +-
 .../request/{QuerySource.java => DataSource.java}  |   61 +-
 .../apache/pinot/common/request/Expression.java    |  774 +++++++++
 .../{FilterOperator.java => ExpressionType.java}   |   31 +-
 .../pinot/common/request/FilterOperator.java       |    2 +-
 .../apache/pinot/common/request/FilterQuery.java   |    6 +-
 .../pinot/common/request/FilterQueryMap.java       |    4 +-
 .../Function.java}                                 |  365 ++--
 .../org/apache/pinot/common/request/GroupBy.java   |   70 +-
 .../pinot/common/request/HavingFilterQuery.java    |    6 +-
 .../pinot/common/request/HavingFilterQueryMap.java |    4 +-
 .../request/{QuerySource.java => Identifier.java}  |  200 +--
 .../pinot/common/request/InstanceRequest.java      |   40 +-
 .../org/apache/pinot/common/request/Literal.java   |  771 +++++++++
 .../apache/pinot/common/request/PinotQuery.java    | 1807 ++++++++++++++++++++
 .../apache/pinot/common/request/QuerySource.java   |    4 +-
 .../org/apache/pinot/common/request/QueryType.java |   14 +-
 .../org/apache/pinot/common/request/Selection.java |   76 +-
 .../apache/pinot/common/request/SelectionSort.java |    6 +-
 .../pinot/common/response/ProcessingException.java |    6 +-
 .../utils/InstancePartitionsType.java}             |   26 +-
 .../java/org/apache/pinot/common/utils/Pairs.java  |   37 +-
 .../pinot/common/utils/request/RequestUtils.java   |   99 +-
 .../parsers/PinotQuery2BrokerRequestConverter.java |  354 ++++
 .../org/apache/pinot/pql/parsers/Pql2Compiler.java |   98 +-
 .../apache/pinot/pql/parsers/pql2/ast/AstNode.java |    5 +
 .../pinot/pql/parsers/pql2/ast/BaseAstNode.java    |   14 +
 .../parsers/pql2/ast/BetweenPredicateAstNode.java  |   31 +-
 .../parsers/pql2/ast/BooleanOperatorAstNode.java   |   10 +
 .../pql2/ast/ComparisonPredicateAstNode.java       |   70 +
 .../ast/{OptionsAstNode.java => FilterKind.java}   |   29 +-
 .../pinot/pql/parsers/pql2/ast/GroupByAstNode.java |   14 +
 .../pinot/pql/parsers/pql2/ast/HavingAstNode.java  |    5 +
 .../pql/parsers/pql2/ast/InPredicateAstNode.java   |   35 +-
 .../pinot/pql/parsers/pql2/ast/OptionAstNode.java  |   30 +
 .../pinot/pql/parsers/pql2/ast/OptionsAstNode.java |   10 +-
 .../pinot/pql/parsers/pql2/ast/OrderByAstNode.java |   24 +
 .../pql/parsers/pql2/ast/OutputColumnAstNode.java  |   27 +
 .../parsers/pql2/ast/OutputColumnListAstNode.java  |    6 +
 .../pql/parsers/pql2/ast/PredicateAstNode.java     |    9 +-
 .../pql/parsers/pql2/ast/PredicateListAstNode.java |   67 +-
 .../pql2/ast/PredicateParenthesisGroupAstNode.java |    6 +
 .../pql2/ast/RegexpLikePredicateAstNode.java       |   18 +
 .../pinot/pql/parsers/pql2/ast/SelectAstNode.java  |   31 +
 .../parsers/pql2/ast/StarColumnListAstNode.java    |    9 +
 .../pinot/pql/parsers/pql2/ast/WhereAstNode.java   |    7 +
 .../apache/pinot/pql/parsers/Pql2CompilerTest.java |  184 +-
 .../request/BrokerRequestSerializationTest.java    |  143 ++
 pinot-common/src/test/resources/pql_queries.list   |  814 +++++++++
 pinot-common/src/thrift/query.thrift               |   70 +
 pinot-common/src/thrift/request.thrift             |    2 +
 .../apache/pinot/controller/ControllerConf.java    |   19 +-
 .../controller/api/upload/SegmentValidator.java    |    3 +-
 .../helix/core/assignment/InstancePartitions.java  |   99 ++
 .../core/assignment/InstancePartitionsUtils.java   |   97 ++
 ...OfflineBalanceNumSegmentAssignmentStrategy.java |   96 ++
 ...flineReplicaGroupSegmentAssignmentStrategy.java |  200 +++
 ...ealtimeBalanceNumSegmentAssignmentStrategy.java |  155 ++
 ...ltimeReplicaGroupSegmentAssignmentStrategy.java |  159 ++
 .../segment/SegmentAssignmentStrategy.java         |   59 +
 .../segment/SegmentAssignmentStrategyFactory.java  |   57 +
 .../assignment/segment/SegmentAssignmentUtils.java |  276 +++
 .../helix/core/minion/PinotTaskManager.java        |    2 +-
 .../RealtimeSegmentValidationManager.java          |    9 +-
 .../controller/validation/StorageQuotaChecker.java |   28 +-
 ...ineBalanceNumSegmentAssignmentStrategyTest.java |  137 ++
 ...eReplicaGroupSegmentAssignmentStrategyTest.java |  289 ++++
 ...imeBalanceNumSegmentAssignmentStrategyTest.java |  208 +++
 ...eReplicaGroupSegmentAssignmentStrategyTest.java |  230 +++
 .../segment/SegmentAssignmentTestUtils.java        |   28 +-
 .../segment/SegmentAssignmentUtilsTest.java        |  434 +++++
 .../validation/StorageQuotaCheckerTest.java        |   19 +-
 .../core/query/reduce/InAndNotInComparison.java    |    1 +
 .../pinot/reduce/HavingClauseComparisonTests.java  |    1 -
 .../ControllerPeriodicTasksIntegrationTests.java   |   71 +-
 .../app/pods/application/route.js                  |    7 +-
 .../app/pods/components/alert-details/component.js |   24 +-
 .../pods/components/anomaly-summary/component.js   |   50 +-
 .../pods/components/anomaly-summary/template.hbs   |    1 +
 .../pods/components/timeseries-chart/component.js  |   50 +-
 .../app/pods/manage/alerts/index/controller.js     |   49 +-
 .../app/pods/manage/alerts/index/route.js          |    1 -
 .../app/pods/manage/alerts/index/template.hbs      |    4 +-
 .../app/pods/manage/explore/route.js               |   21 +-
 .../app/pods/manage/explore/template.hbs           |    1 +
 .../app/pods/screenshot/controller.js              |   39 +-
 .../app/styles/components/timeseries-chart.scss    |   22 +-
 thirdeye/thirdeye-frontend/app/utils/rca-utils.js  |    6 +-
 .../pinot/thirdeye/anomaly/task/TaskDriver.java    |   40 +-
 .../api/application/ApplicationResource.java       |    2 +-
 .../api/user/dashboard/UserDashboardResource.java  |   12 +-
 .../v2/rootcause/AnomalyEventFormatter.java        |    3 +-
 .../bao/jdbc/MergedAnomalyResultManagerImpl.java   |    2 +-
 .../thirdeye/datalayer/dto/EvaluationDTO.java      |    9 +-
 .../datalayer/pojo/DetectionConfigBean.java        |    9 +
 .../thirdeye/datalayer/pojo/EvaluationBean.java    |    6 +-
 .../detection/DetectionPipelineTaskRunner.java     |   10 +-
 .../pinot/thirdeye/detection/DetectionUtils.java   |    5 +-
 .../thirdeye/detection/ModelMaintenanceFlow.java   |   41 +
 .../pinot/thirdeye/detection/ModelRetuneFlow.java  |  110 ++
 .../pinot/thirdeye/detection/alert/AlertUtils.java |    2 +-
 .../detection/alert/DetectionAlertScheduler.java   |    4 +-
 .../detection/alert/DetectionAlertTaskRunner.java  |   21 +-
 .../alert/scheme/DetectionEmailAlerter.java        |    2 +-
 .../pinot/thirdeye/detection/annotation/Yaml.java  |   39 -
 .../annotation/registry/DetectionRegistry.java     |   33 +-
 .../MapeAveragePercentageChangeModelEvaluator.java |    4 +-
 .../onboard/YamlOnboardingTaskRunner.java          |    6 -
 .../validators/DetectionConfigValidator.java       |   20 +-
 .../validators/SubscriptionConfigValidator.java    |    2 +-
 .../detection/wrapper/AnomalyFilterWrapper.java    |   10 +-
 .../thirdeye/detection/yaml/YamlResource.java      |  265 +--
 .../yaml/translator/ConfigTranslator.java          |   21 +-
 ...nslator.java => DetectionConfigTranslator.java} |  120 +-
 ...ator.java => SubscriptionConfigTranslator.java} |   45 +-
 .../translator/YamlDetectionConfigTranslator.java  |   71 -
 .../translator/YamlDetectionTranslatorLoader.java  |   45 -
 .../thirdeye/detector/holiday-anomaly-report.ftl   |    3 +
 .../detection/DefaultModelMaintenanceFlowTest.java |  111 ++
 .../MapePercentageChangeModelEvaluatorTest.java    |    2 +-
 .../detection/components/MockModelEvaluator.java   |   45 +
 .../detection/components/MockTunableDetector.java  |   60 +
 .../detection/spec/MockModelEvaluatorSpec.java     |   38 +
 .../thirdeye/detection/spec/MockTunableSpec.java   |   26 +
 .../thirdeye/detection/yaml/YamlResourceTest.java  |   57 +-
 ...est.java => DetectionConfigTranslatorTest.java} |   25 +-
 .../YamlDetectionAlertConfigTranslatorTest.java    |    8 +-
 .../yaml/detection/detection-config-1.yaml         |   22 +
 .../yaml/detection/detection-config-2.yaml         |   22 +
 ...multiple-anomalies-email-content-formatter.html |    3 +
 148 files changed, 10914 insertions(+), 1573 deletions(-)
 create mode 100644 docker/Dockerfile
 create mode 100644 docker/README.md
 create mode 100755 docker/docker-build-and-push.sh
 create mode 100755 docker/docker-build.sh
 create mode 100755 docker/docker-push.sh
 create mode 100644 licenses-binary/LICENSE-aopalliance.txt
 create mode 100644 licenses-binary/LICENSE-asm.txt
 create mode 100644 licenses-binary/LICENSE-esotericsoftware.txt
 create mode 100644 licenses-binary/LICENSE-jcraft.txt
 create mode 100644 licenses-binary/LICENSE-protobuf.txt
 create mode 100644 licenses-binary/LICENSE-xmlenc.txt
 copy pinot-common/src/main/java/org/apache/pinot/common/request/{QuerySource.java => DataSource.java} (87%)
 create mode 100644 pinot-common/src/main/java/org/apache/pinot/common/request/Expression.java
 copy pinot-common/src/main/java/org/apache/pinot/common/request/{FilterOperator.java => ExpressionType.java} (70%)
 copy pinot-common/src/main/java/org/apache/pinot/common/{response/ProcessingException.java => request/Function.java} (51%)
 copy pinot-common/src/main/java/org/apache/pinot/common/request/{QuerySource.java => Identifier.java} (64%)
 create mode 100644 pinot-common/src/main/java/org/apache/pinot/common/request/Literal.java
 create mode 100644 pinot-common/src/main/java/org/apache/pinot/common/request/PinotQuery.java
 copy pinot-common/src/main/java/org/apache/pinot/{pql/parsers/pql2/ast/OptionsAstNode.java => common/utils/InstancePartitionsType.java} (54%)
 create mode 100644 pinot-common/src/main/java/org/apache/pinot/pql/parsers/PinotQuery2BrokerRequestConverter.java
 copy pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/{OptionsAstNode.java => FilterKind.java} (67%)
 create mode 100644 pinot-common/src/test/resources/pql_queries.list
 create mode 100644 pinot-common/src/thrift/query.thrift
 create mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java
 create mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitionsUtils.java
 create mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineBalanceNumSegmentAssignmentStrategy.java
 create mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentStrategy.java
 create mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java
 create mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java
 create mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentStrategy.java
 create mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentStrategyFactory.java
 create mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
 create mode 100644 pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineBalanceNumSegmentAssignmentStrategyTest.java
 create mode 100644 pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentStrategyTest.java
 create mode 100644 pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java
 create mode 100644 pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java
 copy pinot-common/src/main/java/org/apache/pinot/pql/parsers/pql2/ast/OptionsAstNode.java => pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentTestUtils.java (62%)
 create mode 100644 pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java
 create mode 100644 thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/ModelMaintenanceFlow.java
 create mode 100644 thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/ModelRetuneFlow.java
 delete mode 100644 thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/annotation/Yaml.java
 rename thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/{CompositePipelineConfigTranslator.java => DetectionConfigTranslator.java} (81%)
 rename thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/{YamlDetectionAlertConfigTranslator.java => SubscriptionConfigTranslator.java} (80%)
 delete mode 100644 thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/YamlDetectionConfigTranslator.java
 delete mode 100644 thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/YamlDetectionTranslatorLoader.java
 create mode 100644 thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/DefaultModelMaintenanceFlowTest.java
 create mode 100644 thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/MockModelEvaluator.java
 create mode 100644 thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/components/MockTunableDetector.java
 create mode 100644 thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/spec/MockModelEvaluatorSpec.java
 create mode 100644 thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/spec/MockTunableSpec.java
 rename thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/yaml/translator/{CompositePipelineConfigTranslatorTest.java => DetectionConfigTranslatorTest.java} (77%)
 create mode 100644 thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/detection/detection-config-1.yaml
 create mode 100644 thirdeye/thirdeye-pinot/src/test/resources/org/apache/pinot/thirdeye/detection/yaml/detection/detection-config-2.yaml


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/02: Add logic for leveraging lead controller resource

Posted by jl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch add-logic-for-lead-controller-resource
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit e87ee3966a57cb6943620f78a2b518961a09656a
Author: jackjlli <jl...@linkedin.com>
AuthorDate: Mon Jun 10 22:30:24 2019 -0700

    Add logic for leveraging lead controller resource
---
 .../realtime/LLRealtimeSegmentDataManager.java     |   4 +-
 .../manager/realtime/RealtimeTableDataManager.java |   2 +-
 .../realtime/SegmentBuildTimeLeaseExtender.java    |   8 +-
 .../server/realtime/ControllerLeaderLocator.java   | 106 ++++++++++++++++++---
 .../ServerSegmentCompletionProtocolHandler.java    |   7 +-
 .../realtime/LLRealtimeSegmentDataManagerTest.java |   2 +-
 .../realtime/ControllerLeaderLocatorTest.java      |  19 +++-
 .../tests/SegmentCompletionIntegrationTests.java   |   2 +-
 8 files changed, 120 insertions(+), 30 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 59d2bb8..2e2aced 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1028,6 +1028,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     _segBuildSemaphore = realtimeTableDataManager.getSegmentBuildSemaphore();
     _segmentZKMetadata = (LLCRealtimeSegmentZKMetadata) segmentZKMetadata;
     _tableConfig = tableConfig;
+    _tableNameWithType = _tableConfig.getTableName();
     _realtimeTableDataManager = realtimeTableDataManager;
     _resourceDataDir = resourceDataDir;
     _indexLoadingConfig = indexLoadingConfig;
@@ -1036,7 +1037,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     _segmentVersion = indexLoadingConfig.getSegmentVersion();
     _instanceId = _realtimeTableDataManager.getServerInstance();
     _leaseExtender = SegmentBuildTimeLeaseExtender.getLeaseExtender(_instanceId);
-    _protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics);
+    _protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics, _tableNameWithType);
 
     // TODO Validate configs
     IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
@@ -1046,7 +1047,6 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     _segmentNameStr = _segmentZKMetadata.getSegmentName();
     _segmentName = new LLCSegmentName(_segmentNameStr);
     _streamPartitionId = _segmentName.getPartitionId();
-    _tableNameWithType = _tableConfig.getTableName();
     _timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
     _metricKeyName = _tableNameWithType + "-" + _streamTopic + "-" + _streamPartitionId;
     segmentLogger = LoggerFactory.getLogger(LLRealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index cde73a4..8b81dc6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -89,7 +89,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
 
   @Override
   protected void doInit() {
-    _leaseExtender = SegmentBuildTimeLeaseExtender.create(_instanceId, _serverMetrics);
+    _leaseExtender = SegmentBuildTimeLeaseExtender.create(_instanceId, _serverMetrics, _tableNameWithType);
 
     File statsFile = new File(_tableDataDir, STATS_FILE_NAME);
     try {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
index 6676619..fcd6422 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
@@ -57,20 +57,20 @@ public class SegmentBuildTimeLeaseExtender {
   }
 
   public static synchronized SegmentBuildTimeLeaseExtender create(final String instanceId,
-      ServerMetrics serverMetrics) {
+      ServerMetrics serverMetrics, String tableNameWithType) {
     SegmentBuildTimeLeaseExtender leaseExtender = INSTANCE_TO_LEASE_EXTENDER.get(instanceId);
     if (leaseExtender != null) {
       LOGGER.warn("Instance already exists");
     } else {
-      leaseExtender = new SegmentBuildTimeLeaseExtender(instanceId, serverMetrics);
+      leaseExtender = new SegmentBuildTimeLeaseExtender(instanceId, serverMetrics, tableNameWithType);
       INSTANCE_TO_LEASE_EXTENDER.put(instanceId, leaseExtender);
     }
     return leaseExtender;
   }
 
-  private SegmentBuildTimeLeaseExtender(String instanceId, ServerMetrics serverMetrics) {
+  private SegmentBuildTimeLeaseExtender(String instanceId, ServerMetrics serverMetrics, String tableNameWithType) {
     _instanceId = instanceId;
-    _protocolHandler = new ServerSegmentCompletionProtocolHandler(serverMetrics);
+    _protocolHandler = new ServerSegmentCompletionProtocolHandler(serverMetrics, tableNameWithType);
     _executor = new ScheduledThreadPoolExecutor(1);
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java
index 8a3b0fb..07b0087 100644
--- a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java
@@ -19,15 +19,20 @@
 package org.apache.pinot.server.realtime;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.util.Map;
+import java.util.Set;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.model.ExternalView;
 import org.apache.pinot.core.query.utils.Pair;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.common.utils.CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME;
+
 // Helix keeps the old controller around for 30s before electing a new one, so we will keep getting
 // the old controller as leader, and it will keep returning NOT_LEADER.
 
@@ -78,35 +83,106 @@ public class ControllerLeaderLocator {
   /**
    * Locate the controller leader so that we can send LLC segment completion requests to it.
    * Checks the {@link ControllerLeaderLocator::_cachedControllerLeaderInvalid} flag and fetches the leader from helix if cached value is invalid
-   *
+   * @param rawTableName table name without type.
    * @return The host:port string of the current controller leader.
    */
-  public synchronized Pair<String, Integer> getControllerLeader() {
+  public synchronized Pair<String, Integer> getControllerLeader(String rawTableName) {
     if (!_cachedControllerLeaderInvalid) {
       return _controllerLeaderHostPort;
     }
 
+    String leaderForTable = getLeaderForTable(rawTableName);
+    if (leaderForTable == null) {
+      LOGGER.warn("Failed to find a leader for Table: {}", rawTableName);
+      _cachedControllerLeaderInvalid = true;
+      return null;
+    } else {
+      _controllerLeaderHostPort = generateControllerLeaderHostPortPair(leaderForTable);
+      _cachedControllerLeaderInvalid = false;
+      LOGGER.info("Setting controller leader to be {}:{}", _controllerLeaderHostPort.getFirst(),
+          _controllerLeaderHostPort.getSecond());
+      return _controllerLeaderHostPort;
+    }
+  }
+
+  /**
+   * If partition leader exists, use this as the leader for realtime segment completion.
+   * Otherwise, try to use Helix leader.
+   * @param rawTableName table name without type
+   * @return the leader for this table.
+   */
+  private String getLeaderForTable(String rawTableName) {
+    String leaderForTable;
+    ExternalView leadControllerResourceExternalView = _helixManager.getClusterManagmentTool().getResourceExternalView(_clusterName, LEAD_CONTROLLER_RESOURCE_NAME);
+    String partitionLeader = getPartitionLeader(leadControllerResourceExternalView, rawTableName);
+    if (partitionLeader != null) {
+      leaderForTable = partitionLeader;
+    } else {
+      // Get Helix leader to be the leader to this table.
+      String helixLeader = getHelixClusterLeader();
+      if (helixLeader != null) {
+        leaderForTable = helixLeader;
+      } else {
+        leaderForTable = null;
+      }
+    }
+    return leaderForTable;
+  }
+
+  /**
+   * Gets partition leader from lead controller resource.
+   * If the resource is disabled or no controller registered as participant, there is no instance in "MASTER" state.
+   *
+   * @param leadControllerResourceExternalView external view of lead controller resource
+   * @param rawTableName table name without type
+   * @return leader of partition, null if not found.
+   */
+  private String getPartitionLeader(ExternalView leadControllerResourceExternalView, String rawTableName) {
+    if (leadControllerResourceExternalView == null) {
+      return null;
+    }
+    Set<String> partitionSet = leadControllerResourceExternalView.getPartitionSet();
+    if (partitionSet == null || partitionSet.isEmpty()) {
+      return null;
+    }
+    int numPartitions = partitionSet.size();
+    int partitionIndex = rawTableName.hashCode() % numPartitions;
+    String partitionName = LEAD_CONTROLLER_RESOURCE_NAME + "_" + partitionIndex;
+    Map<String, String> stateMap = leadControllerResourceExternalView.getStateMap(partitionName);
+
+    for (Map.Entry<String, String> entry : stateMap.entrySet()) {
+      if ("MASTER".equals(entry.getValue())) {
+        return entry.getKey();
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Gets Helix leader in the cluster. Null if there is no leader.
+   * @return Helix leader.
+   */
+  private String getHelixClusterLeader() {
     BaseDataAccessor<ZNRecord> dataAccessor = _helixManager.getHelixDataAccessor().getBaseDataAccessor();
     Stat stat = new Stat();
     try {
-      ZNRecord znRecord =
-          dataAccessor.get("/" + _clusterName + "/CONTROLLER/LEADER", stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
-      String leader = znRecord.getId();
-      int index = leader.lastIndexOf('_');
-      String leaderHost = leader.substring(0, index);
-      int leaderPort = Integer.valueOf(leader.substring(index + 1));
-      _controllerLeaderHostPort = new Pair<>(leaderHost, leaderPort);
-      _cachedControllerLeaderInvalid = false;
-      LOGGER.info("Setting controller leader to be {}:{} as per znode version {}, mtime {}", leaderHost, leaderPort,
-          stat.getVersion(), stat.getMtime());
-      return _controllerLeaderHostPort;
+      ZNRecord znRecord = dataAccessor.get("/" + _clusterName + "/CONTROLLER/LEADER", stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
+      String helixLeader = znRecord.getId();
+      LOGGER.info("Getting Helix leader: {} as per znode version {}, mtime {}", helixLeader, stat.getVersion(), stat.getMtime());
+      return helixLeader;
     } catch (Exception e) {
-      LOGGER.warn("Could not locate controller leader, exception", e);
-      _cachedControllerLeaderInvalid = true;
+      LOGGER.warn("Could not locate Helix leader", e);
       return null;
     }
   }
 
+  private Pair<String, Integer> generateControllerLeaderHostPortPair(String controllerLeaderId) {
+    int index = controllerLeaderId.lastIndexOf('_');
+    String leaderHost = controllerLeaderId.substring(0, index);
+    int leaderPort = Integer.valueOf(controllerLeaderId.substring(index + 1));
+    return new Pair<>(leaderHost, leaderPort);
+  }
+
   /**
    * Invalidates the cached controller leader value by setting the {@link ControllerLeaderLocator::_cacheControllerLeadeInvalid} flag.
    * This flag is always checked first by {@link ControllerLeaderLocator::getControllerLeader()} method before returning the leader. If set, leader is fetched from helix, else cached leader value is returned.
diff --git a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
index bcfa7ee..5c75f1e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
@@ -23,6 +23,7 @@ import java.net.URI;
 import java.util.Map;
 import javax.net.ssl.SSLContext;
 import org.apache.commons.configuration.Configuration;
+import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
@@ -51,6 +52,7 @@ public class ServerSegmentCompletionProtocolHandler {
 
   private final FileUploadDownloadClient _fileUploadDownloadClient;
   private final ServerMetrics _serverMetrics;
+  private final String _rawTableName;
 
   public static void init(Configuration uploaderConfig) {
     Configuration httpsConfig = uploaderConfig.subset(HTTPS_PROTOCOL);
@@ -62,9 +64,10 @@ public class ServerSegmentCompletionProtocolHandler {
         uploaderConfig.getInt(CONFIG_OF_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS, DEFAULT_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS);
   }
 
-  public ServerSegmentCompletionProtocolHandler(ServerMetrics serverMetrics) {
+  public ServerSegmentCompletionProtocolHandler(ServerMetrics serverMetrics, String tableNameWithType) {
     _fileUploadDownloadClient = new FileUploadDownloadClient(_sslContext);
     _serverMetrics = serverMetrics;
+    _rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
   }
 
   public SegmentCompletionProtocol.Response segmentCommitStart(SegmentCompletionProtocol.Request.Params params) {
@@ -159,7 +162,7 @@ public class ServerSegmentCompletionProtocolHandler {
 
   private String createSegmentCompletionUrl(SegmentCompletionProtocol.Request request) {
     ControllerLeaderLocator leaderLocator = ControllerLeaderLocator.getInstance();
-    final Pair<String, Integer> leaderHostPort = leaderLocator.getControllerLeader();
+    final Pair<String, Integer> leaderHostPort = leaderLocator.getControllerLeader(_rawTableName);
     if (leaderHostPort == null) {
       LOGGER.warn("No leader found while trying to send {}", request.toString());
       return null;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index 3d7bfce..8ff15b8 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -111,7 +111,7 @@ public class LLRealtimeSegmentDataManagerTest {
 
   private RealtimeTableDataManager createTableDataManager() {
     final String instanceId = "server-1";
-    SegmentBuildTimeLeaseExtender.create(instanceId, new ServerMetrics(new MetricsRegistry()));
+    SegmentBuildTimeLeaseExtender.create(instanceId, new ServerMetrics(new MetricsRegistry()), _tableName);
     RealtimeTableDataManager tableDataManager = mock(RealtimeTableDataManager.class);
     when(tableDataManager.getServerInstance()).thenReturn(instanceId);
     RealtimeSegmentStatsHistory statsHistory = mock(RealtimeSegmentStatsHistory.class);
diff --git a/pinot-core/src/test/java/org/apache/pinot/server/realtime/ControllerLeaderLocatorTest.java b/pinot-core/src/test/java/org/apache/pinot/server/realtime/ControllerLeaderLocatorTest.java
index 34e955d..5f0cdbc 100644
--- a/pinot-core/src/test/java/org/apache/pinot/server/realtime/ControllerLeaderLocatorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/server/realtime/ControllerLeaderLocatorTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.server.realtime;
 
 import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
@@ -35,6 +36,7 @@ import static org.mockito.Mockito.when;
 
 
 public class ControllerLeaderLocatorTest {
+  private String testTable = "testTable";
 
   /**
    * Tests the invalidate logic for cached controller leader
@@ -45,6 +47,7 @@ public class ControllerLeaderLocatorTest {
     HelixManager helixManager = mock(HelixManager.class);
     HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
     BaseDataAccessor<ZNRecord> baseDataAccessor = mock(BaseDataAccessor.class);
+    HelixAdmin helixAdmin = mock(HelixAdmin.class);
     ZNRecord znRecord = mock(ZNRecord.class);
     final String leaderHost = "host";
     final int leaderPort = 12345;
@@ -54,6 +57,8 @@ public class ControllerLeaderLocatorTest {
     when(znRecord.getId()).thenReturn(leaderHost + "_" + leaderPort);
     when(baseDataAccessor.get(anyString(), any(), anyInt())).thenReturn(znRecord);
     when(helixManager.getClusterName()).thenReturn("testCluster");
+    when(helixManager.getClusterManagmentTool()).thenReturn(helixAdmin);
+    when(helixAdmin.getResourceExternalView(anyString(), anyString())).thenReturn(null);
 
     // Create Controller Leader Locator
     FakeControllerLeaderLocator.create(helixManager);
@@ -78,7 +83,7 @@ public class ControllerLeaderLocatorTest {
     Assert.assertEquals(controllerLeaderLocator.getLastCacheInvalidateMillis(), lastCacheInvalidateMillis);
 
     // getControllerLeader, which validates the cache
-    controllerLeaderLocator.getControllerLeader();
+    controllerLeaderLocator.getControllerLeader(testTable);
     Assert.assertFalse(controllerLeaderLocator.isCachedControllerLeaderInvalid());
     Assert.assertEquals(controllerLeaderLocator.getLastCacheInvalidateMillis(), lastCacheInvalidateMillis);
 
@@ -104,16 +109,19 @@ public class ControllerLeaderLocatorTest {
     HelixManager helixManager = mock(HelixManager.class);
     HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
     BaseDataAccessor<ZNRecord> baseDataAccessor = mock(BaseDataAccessor.class);
+    HelixAdmin helixAdmin = mock(HelixAdmin.class);
 
     when(helixManager.getHelixDataAccessor()).thenReturn(helixDataAccessor);
     when(helixDataAccessor.getBaseDataAccessor()).thenReturn(baseDataAccessor);
     when(baseDataAccessor.get(anyString(), (Stat) any(), anyInt())).thenThrow(new RuntimeException());
+    when(helixManager.getClusterManagmentTool()).thenReturn(helixAdmin);
+    when(helixAdmin.getResourceExternalView(anyString(), anyString())).thenReturn(null);
 
     // Create Controller Leader Locator
     FakeControllerLeaderLocator.create(helixManager);
     ControllerLeaderLocator controllerLeaderLocator = FakeControllerLeaderLocator.getInstance();
 
-    Assert.assertEquals(controllerLeaderLocator.getControllerLeader(), null);
+    Assert.assertEquals(controllerLeaderLocator.getControllerLeader(testTable), null);
   }
 
   @Test
@@ -121,6 +129,7 @@ public class ControllerLeaderLocatorTest {
     HelixManager helixManager = mock(HelixManager.class);
     HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
     BaseDataAccessor<ZNRecord> baseDataAccessor = mock(BaseDataAccessor.class);
+    HelixAdmin helixAdmin = mock(HelixAdmin.class);
     ZNRecord znRecord = mock(ZNRecord.class);
     final String leaderHost = "host";
     final int leaderPort = 12345;
@@ -130,14 +139,16 @@ public class ControllerLeaderLocatorTest {
     when(znRecord.getId()).thenReturn(leaderHost + "_" + leaderPort);
     when(baseDataAccessor.get(anyString(), (Stat) any(), anyInt())).thenReturn(znRecord);
     when(helixManager.getClusterName()).thenReturn("myCluster");
+    when(helixManager.getClusterManagmentTool()).thenReturn(helixAdmin);
+    when(helixAdmin.getResourceExternalView(anyString(), anyString())).thenReturn(null);
 
     // Create Controller Leader Locator
     FakeControllerLeaderLocator.create(helixManager);
     ControllerLeaderLocator controllerLeaderLocator = FakeControllerLeaderLocator.getInstance();
 
     Pair<String, Integer> expectedLeaderLocation = new Pair<>(leaderHost, leaderPort);
-    Assert.assertEquals(controllerLeaderLocator.getControllerLeader().getFirst(), expectedLeaderLocation.getFirst());
-    Assert.assertEquals(controllerLeaderLocator.getControllerLeader().getSecond(), expectedLeaderLocation.getSecond());
+    Assert.assertEquals(controllerLeaderLocator.getControllerLeader(testTable).getFirst(), expectedLeaderLocation.getFirst());
+    Assert.assertEquals(controllerLeaderLocator.getControllerLeader(testTable).getSecond(), expectedLeaderLocation.getSecond());
   }
 
   static class FakeControllerLeaderLocator extends ControllerLeaderLocator {
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTests.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTests.java
index 7e7b041..f6cd400 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTests.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTests.java
@@ -134,7 +134,7 @@ public class SegmentCompletionIntegrationTests extends LLCRealtimeClusterIntegra
 
     // Now report to the controller that we had to stop consumption
     ServerSegmentCompletionProtocolHandler protocolHandler =
-        new ServerSegmentCompletionProtocolHandler(new ServerMetrics(new MetricsRegistry()));
+        new ServerSegmentCompletionProtocolHandler(new ServerMetrics(new MetricsRegistry()), realtimeTableName);
     SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
     params.withOffset(45688L).withSegmentName(_currentSegment).withReason("RandomReason")
         .withInstanceId(_serverInstance);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 02/02: Add logic for lead controller resource on controller side

Posted by jl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch add-logic-for-lead-controller-resource
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit df041cbd0541f04164a0e644bed37fc43d1f44f1
Author: jackjlli <jl...@linkedin.com>
AuthorDate: Fri Jun 14 16:35:14 2019 -0700

    Add logic for lead controller resource on controller side
---
 .../apache/pinot/controller/ControllerStarter.java | 44 ++++++++-----
 .../pinot/controller/LeadControllerManager.java    | 76 ++++++++++++++++++++++
 .../PinotSegmentUploadRestletResource.java         |  7 +-
 .../controller/api/upload/SegmentValidator.java    | 10 +--
 .../controller/helix/SegmentStatusChecker.java     |  8 ++-
 .../helix/core/PinotHelixResourceManager.java      | 19 +++++-
 .../helix/core/minion/PinotTaskManager.java        |  8 ++-
 .../core/periodictask/ControllerPeriodicTask.java  |  9 ++-
 .../ControllerPeriodicTaskScheduler.java           |  8 +--
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 32 ++++-----
 .../core/realtime/SegmentCompletionManager.java    | 54 ++++++++-------
 .../core/relocation/RealtimeSegmentRelocator.java  |  8 ++-
 .../helix/core/retention/RetentionManager.java     |  8 ++-
 .../core/statemodel/LeadControllerChecker.java     | 55 ++++++++++++++++
 ...rollerResourceMasterSlaveStateModelFactory.java | 64 ++++++++++++++++++
 .../BrokerResourceValidationManager.java           |  5 +-
 .../validation/OfflineSegmentIntervalChecker.java  | 13 ++--
 .../RealtimeSegmentValidationManager.java          |  8 ++-
 .../controller/validation/StorageQuotaChecker.java | 14 ++--
 .../controller/helix/SegmentStatusCheckerTest.java | 73 ++++++++++++++++++---
 .../periodictask/ControllerPeriodicTaskTest.java   |  7 +-
 .../PinotLLCRealtimeSegmentManagerTest.java        |  7 +-
 .../helix/core/realtime/SegmentCompletionTest.java | 10 ++-
 .../relocation/RealtimeSegmentRelocatorTest.java   | 14 ++--
 .../helix/core/retention/RetentionManagerTest.java | 12 +++-
 .../validation/StorageQuotaCheckerTest.java        | 27 ++++----
 26 files changed, 455 insertions(+), 145 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index c313616..a8fd1f7 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -113,6 +113,7 @@ public class ControllerStarter {
   private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
   private SegmentCompletionManager _segmentCompletionManager;
   private ControllerLeadershipManager _controllerLeadershipManager;
+  private LeadControllerManager _leadControllerManager;
   private List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbackList;
 
   public ControllerStarter(ControllerConf conf) {
@@ -255,6 +256,10 @@ public class ControllerStarter {
     _helixResourceManager.start();
     HelixManager helixParticipantManager = _helixResourceManager.getHelixZkManager();
 
+    // Get lead controller manager from resource manager
+    _leadControllerManager = _helixResourceManager.getLeadControllerManager();
+    _leadControllerManager.registerControllerLeadershipManager(_controllerLeadershipManager);
+
     LOGGER.info("Registering controller leadership manager");
     // TODO: when Helix separation is completed, leadership only depends on the master in leadControllerResource, remove
     //       ControllerLeadershipManager and this callback.
@@ -266,15 +271,13 @@ public class ControllerStarter {
 
     // Helix resource manager must be started in order to create PinotLLCRealtimeSegmentManager
     LOGGER.info("Starting realtime segment manager");
-
     _pinotLLCRealtimeSegmentManager =
-        new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config, _controllerMetrics,
-            _controllerLeadershipManager);
+        new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config, _controllerMetrics, _leadControllerManager);
     // TODO: Need to put this inside HelixResourceManager when ControllerLeadershipManager is removed.
     _helixResourceManager.registerPinotLLCRealtimeSegmentManager(_pinotLLCRealtimeSegmentManager);
     _segmentCompletionManager =
         new SegmentCompletionManager(helixParticipantManager, _pinotLLCRealtimeSegmentManager, _controllerMetrics,
-            _controllerLeadershipManager, _config.getSegmentCommitTimeoutSeconds());
+            _leadControllerManager, _config.getSegmentCommitTimeoutSeconds());
 
     if (_config.getHLCTablesAllowed()) {
       LOGGER.info("Realtime tables with High Level consumers will be supported");
@@ -289,7 +292,9 @@ public class ControllerStarter {
     List<PeriodicTask> controllerPeriodicTasks = setupControllerPeriodicTasks();
     LOGGER.info("Init controller periodic tasks scheduler");
     _controllerPeriodicTaskScheduler = new ControllerPeriodicTaskScheduler();
-    _controllerPeriodicTaskScheduler.init(controllerPeriodicTasks, _controllerLeadershipManager);
+    _controllerPeriodicTaskScheduler.init(controllerPeriodicTasks, _leadControllerManager);
+
+    _controllerPeriodicTaskScheduler.start();
 
     LOGGER.info("Registering rebalance segments factory");
     _helixResourceManager
@@ -327,7 +332,7 @@ public class ControllerStarter {
         bind(_controllerMetrics).to(ControllerMetrics.class);
         bind(accessControlFactory).to(AccessControlFactory.class);
         bind(metadataEventNotifierFactory).to(MetadataEventNotifierFactory.class);
-        bind(_controllerLeadershipManager).to(ControllerLeadershipManager.class);
+        bind(_leadControllerManager).to(LeadControllerManager.class);
       }
     });
 
@@ -433,24 +438,29 @@ public class ControllerStarter {
   protected List<PeriodicTask> setupControllerPeriodicTasks() {
     LOGGER.info("Setting up periodic tasks");
     List<PeriodicTask> periodicTasks = new ArrayList<>();
-    _taskManager = new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, _config, _controllerMetrics);
+    _taskManager =
+        new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, _leadControllerManager, _config,
+            _controllerMetrics);
     periodicTasks.add(_taskManager);
-    _retentionManager = new RetentionManager(_helixResourceManager, _config, _controllerMetrics);
+    _retentionManager =
+        new RetentionManager(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
     periodicTasks.add(_retentionManager);
     _offlineSegmentIntervalChecker =
-        new OfflineSegmentIntervalChecker(_config, _helixResourceManager, new ValidationMetrics(_metricsRegistry),
-            _controllerMetrics);
+        new OfflineSegmentIntervalChecker(_config, _helixResourceManager, _leadControllerManager,
+            new ValidationMetrics(_metricsRegistry), _controllerMetrics);
     periodicTasks.add(_offlineSegmentIntervalChecker);
     _realtimeSegmentValidationManager =
-        new RealtimeSegmentValidationManager(_config, _helixResourceManager, _pinotLLCRealtimeSegmentManager,
-            new ValidationMetrics(_metricsRegistry), _controllerMetrics);
+        new RealtimeSegmentValidationManager(_config, _helixResourceManager, _leadControllerManager,
+            _pinotLLCRealtimeSegmentManager, new ValidationMetrics(_metricsRegistry), _controllerMetrics);
     periodicTasks.add(_realtimeSegmentValidationManager);
     _brokerResourceValidationManager =
-        new BrokerResourceValidationManager(_config, _helixResourceManager, _controllerMetrics);
+        new BrokerResourceValidationManager(_config, _helixResourceManager, _leadControllerManager, _controllerMetrics);
     periodicTasks.add(_brokerResourceValidationManager);
-    _segmentStatusChecker = new SegmentStatusChecker(_helixResourceManager, _config, _controllerMetrics);
+    _segmentStatusChecker =
+        new SegmentStatusChecker(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
     periodicTasks.add(_segmentStatusChecker);
-    _realtimeSegmentRelocator = new RealtimeSegmentRelocator(_helixResourceManager, _config, _controllerMetrics);
+    _realtimeSegmentRelocator =
+        new RealtimeSegmentRelocator(_helixResourceManager, _leadControllerManager, _config, _controllerMetrics);
     periodicTasks.add(_realtimeSegmentRelocator);
 
     return periodicTasks;
@@ -482,6 +492,10 @@ public class ControllerStarter {
       LOGGER.info("Stopping controller leadership manager");
       _controllerLeadershipManager.stop();
 
+      // Stop controller periodic task.
+      LOGGER.info("Stopping controller periodic tasks");
+      _controllerPeriodicTaskScheduler.stop();
+
       // Stop PinotLLCSegmentManager before stopping Jersey API. It is possible that stopping Jersey API
       // may interrupt the handlers waiting on an I/O.
       _pinotLLCRealtimeSegmentManager.stop();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java
new file mode 100644
index 0000000..9e4d3fb
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.config.TableNameBuilder;
+import org.apache.pinot.controller.helix.core.statemodel.LeadControllerChecker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pinot.common.utils.CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE;
+
+
+public class LeadControllerManager {
+  private static final Logger LOGGER = LoggerFactory.getLogger(LeadControllerManager.class);
+
+  private Map<String, LeadershipChangeSubscriber> _subscribers = new HashMap<>();
+
+  private final LeadControllerChecker _leadControllerChecker;
+  private ControllerLeadershipManager _controllerLeadershipManager;
+
+  public LeadControllerManager() {
+    _leadControllerChecker = new LeadControllerChecker();
+  }
+
+  public void registerControllerLeadershipManager(ControllerLeadershipManager controllerLeadershipManager) {
+    _controllerLeadershipManager = controllerLeadershipManager;
+  }
+
+  public boolean isLeaderForTable(String tableName) {
+    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+    int partitionIndex = rawTableName.hashCode() % NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE;
+    if (_leadControllerChecker.isPartitionLeader(partitionIndex)) {
+      return true;
+    } else if (_controllerLeadershipManager != null) {
+      return _controllerLeadershipManager.isLeader();
+    } else {
+      return false;
+    }
+  }
+
+  public synchronized void addPartitionLeader(String partitionName) {
+    _leadControllerChecker.addPartitionLeader(partitionName);
+  }
+
+  public synchronized void removePartitionLeader(String partitionName) {
+    _leadControllerChecker.removePartitionLeader(partitionName);
+  }
+
+  public void subscribe(String className, LeadershipChangeSubscriber subscriber) {
+    LOGGER.info("{} subscribing to leadership changes", className);
+    _subscribers.put(className, subscriber);
+//      subscriber.onBecomingLeader();
+  }
+
+  public void onLeadControllerChange() {
+
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
index 5259d9f..bd4b8a0 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
@@ -57,7 +57,6 @@ import javax.ws.rs.core.Response;
 import org.apache.commons.httpclient.HttpConnectionManager;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
-import org.apache.helix.ZNRecord;
 import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.metrics.ControllerMeter;
@@ -70,7 +69,7 @@ import org.apache.pinot.common.utils.JsonUtils;
 import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.api.access.AccessControl;
 import org.apache.pinot.controller.api.access.AccessControlFactory;
 import org.apache.pinot.controller.api.upload.SegmentValidator;
@@ -117,7 +116,7 @@ public class PinotSegmentUploadRestletResource {
   AccessControlFactory _accessControlFactory;
 
   @Inject
-  ControllerLeadershipManager _controllerLeadershipManager;
+  LeadControllerManager _leadControllerManager;
 
   @GET
   @Produces(MediaType.APPLICATION_JSON)
@@ -325,7 +324,7 @@ public class PinotSegmentUploadRestletResource {
       // Validate segment
       SegmentValidatorResponse segmentValidatorResponse =
           new SegmentValidator(_pinotHelixResourceManager, _controllerConf, _executor, _connectionManager,
-              _controllerMetrics, _controllerLeadershipManager)
+              _controllerMetrics, _leadControllerManager)
               .validateSegment(rawTableName, segmentMetadata, tempSegmentDir);
 
       // Zk operations
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
index 39a9657..42c7cf8 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentValidator.java
@@ -34,7 +34,7 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.segment.SegmentMetadata;
 import org.apache.pinot.common.utils.time.TimeUtils;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.api.resources.ControllerApplicationException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.util.TableSizeReader;
@@ -55,17 +55,17 @@ public class SegmentValidator {
   private final Executor _executor;
   private final HttpConnectionManager _connectionManager;
   private final ControllerMetrics _controllerMetrics;
-  private final ControllerLeadershipManager _controllerLeadershipManager;
+  private final LeadControllerManager _leadControllerManager;
 
   public SegmentValidator(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf controllerConf,
       Executor executor, HttpConnectionManager connectionManager, ControllerMetrics controllerMetrics,
-      ControllerLeadershipManager controllerLeadershipManager) {
+      LeadControllerManager leadControllerManager) {
     _pinotHelixResourceManager = pinotHelixResourceManager;
     _controllerConf = controllerConf;
     _executor = executor;
     _connectionManager = connectionManager;
     _controllerMetrics = controllerMetrics;
-    _controllerLeadershipManager = controllerLeadershipManager;
+    _leadControllerManager = leadControllerManager;
   }
 
   public SegmentValidatorResponse validateSegment(String rawTableName, SegmentMetadata segmentMetadata,
@@ -135,7 +135,7 @@ public class SegmentValidator {
         new TableSizeReader(_executor, _connectionManager, _controllerMetrics, _pinotHelixResourceManager);
     StorageQuotaChecker quotaChecker =
         new StorageQuotaChecker(offlineTableConfig, tableSizeReader, _controllerMetrics, _pinotHelixResourceManager,
-            _controllerLeadershipManager);
+            _leadControllerManager);
     return quotaChecker.isSegmentStorageWithinQuota(segmentFile, metadata.getName(),
         _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index f40e2b3..9261420 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -29,6 +29,7 @@ import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import org.slf4j.Logger;
@@ -56,10 +57,11 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh
    * @param pinotHelixResourceManager The resource checker used to interact with Helix
    * @param config The controller configuration object
    */
-  public SegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
-      ControllerMetrics controllerMetrics) {
+  public SegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics) {
     super("SegmentStatusChecker", config.getStatusCheckerFrequencyInSeconds(),
-        config.getStatusCheckerInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
+        config.getStatusCheckerInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager,
+        controllerMetrics);
 
     _waitForPushTimeSeconds = config.getStatusCheckerWaitForPushTimeInSeconds();
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index a4d4d0b..e33ee62 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -53,7 +53,6 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.examples.MasterSlaveStateModelFactory;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
 import org.apache.helix.model.CurrentState;
@@ -99,6 +98,7 @@ import org.apache.pinot.common.utils.helix.PinotHelixPropertyStoreZnRecordProvid
 import org.apache.pinot.common.utils.retry.RetryPolicies;
 import org.apache.pinot.common.utils.retry.RetryPolicy;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.api.pojos.Instance;
 import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategy;
@@ -106,6 +106,7 @@ import org.apache.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategy
 import org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategy;
 import org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategyEnum;
 import org.apache.pinot.controller.helix.core.sharding.SegmentAssignmentStrategyFactory;
+import org.apache.pinot.controller.helix.core.statemodel.LeadControllerResourceMasterSlaveStateModelFactory;
 import org.apache.pinot.controller.helix.core.util.ZKMetadataUtils;
 import org.apache.pinot.controller.helix.starter.HelixConfig;
 import org.apache.pinot.core.realtime.stream.StreamConfig;
@@ -144,6 +145,7 @@ public class PinotHelixResourceManager {
   private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
   private RebalanceSegmentStrategyFactory _rebalanceSegmentStrategyFactory;
   private TableRebalancer _tableRebalancer;
+  private LeadControllerManager _leadControllerManager;
 
   public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String helixClusterName,
       @Nonnull String controllerInstanceId, String dataDir, long externalViewOnlineToOfflineTimeoutMillis,
@@ -170,6 +172,7 @@ public class PinotHelixResourceManager {
    * Create Helix cluster if needed, and then start a Pinot controller instance.
    */
   public synchronized void start() {
+    _leadControllerManager = new LeadControllerManager();
     _helixZkManager = registerAndConnectAsHelixParticipant();
     _helixAdmin = _helixZkManager.getClusterManagmentTool();
     _propertyStore = _helixZkManager.getHelixPropertyStore();
@@ -255,6 +258,16 @@ public class PinotHelixResourceManager {
     return _propertyStore;
   }
 
+
+  /**
+   * Get lead controller manager.
+   *
+   * @return lead controller manager
+   */
+  public LeadControllerManager getLeadControllerManager() {
+    return _leadControllerManager;
+  }
+
   /**
    * Register and connect to Helix cluster as PARTICIPANT role.
    */
@@ -263,8 +276,8 @@ public class PinotHelixResourceManager {
         HelixManagerFactory.getZKHelixManager(_helixClusterName, _instanceId, InstanceType.PARTICIPANT, _helixZkURL);
 
     // Registers Master-Slave state model to state machine engine, which is for calculating participant assignment in lead controller resource.
-    helixManager.getStateMachineEngine()
-        .registerStateModelFactory(MasterSlaveSMD.name, new MasterSlaveStateModelFactory());
+    helixManager.getStateMachineEngine().registerStateModelFactory(MasterSlaveSMD.name,
+        new LeadControllerResourceMasterSlaveStateModelFactory(_leadControllerManager));
 
     try {
       helixManager.connect();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 7a09c00..649b966 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -29,6 +29,7 @@ import org.apache.pinot.common.config.TableTaskConfig;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
 import org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegistry;
@@ -51,10 +52,11 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {
   private final TaskGeneratorRegistry _taskGeneratorRegistry;
 
   public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
-      PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
-      ControllerMetrics controllerMetrics) {
+      PinotHelixResourceManager helixResourceManager, LeadControllerManager leadControllerManager,
+      ControllerConf controllerConf, ControllerMetrics controllerMetrics) {
     super("PinotTaskManager", controllerConf.getTaskManagerFrequencyInSeconds(),
-        controllerConf.getPinotTaskManagerInitialDelaySeconds(), helixResourceManager, controllerMetrics);
+        controllerConf.getPinotTaskManagerInitialDelaySeconds(), helixResourceManager, leadControllerManager,
+        controllerMetrics);
     _helixTaskResourceManager = helixTaskResourceManager;
     _clusterInfoProvider = new ClusterInfoProvider(helixResourceManager, helixTaskResourceManager, controllerConf);
     _taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoProvider);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 7e764f3..76d106f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -23,6 +23,7 @@ import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.core.periodictask.BasePeriodicTask;
 import org.slf4j.Logger;
@@ -40,12 +41,15 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
   private static final Logger LOGGER = LoggerFactory.getLogger(ControllerPeriodicTask.class);
 
   protected final PinotHelixResourceManager _pinotHelixResourceManager;
+  protected final LeadControllerManager _leadControllerManager;
   protected final ControllerMetrics _controllerMetrics;
 
   public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds, long initialDelayInSeconds,
-      PinotHelixResourceManager pinotHelixResourceManager, ControllerMetrics controllerMetrics) {
+      PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager,
+      ControllerMetrics controllerMetrics) {
     super(taskName, runFrequencyInSeconds, initialDelayInSeconds);
     _pinotHelixResourceManager = pinotHelixResourceManager;
+    _leadControllerManager = leadControllerManager;
     _controllerMetrics = controllerMetrics;
   }
 
@@ -75,6 +79,9 @@ public abstract class ControllerPeriodicTask<C> extends BasePeriodicTask {
         LOGGER.info("Task: {} is stopped, early terminate the task", _taskName);
         break;
       }
+      if (!_leadControllerManager.isLeaderForTable(tableNameWithType)) {
+        continue;
+      }
       try {
         processTable(tableNameWithType, context);
       } catch (Exception e) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java
index 8a2b63c..858bfca 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskScheduler.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.controller.helix.core.periodictask;
 
 import java.util.List;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.LeadershipChangeSubscriber;
 import org.apache.pinot.core.periodictask.PeriodicTask;
 import org.apache.pinot.core.periodictask.PeriodicTaskScheduler;
@@ -39,11 +39,11 @@ public class ControllerPeriodicTaskScheduler extends PeriodicTaskScheduler imple
    * Initialize the {@link ControllerPeriodicTaskScheduler} with the list of {@link ControllerPeriodicTask} created at startup
    * This is called only once during controller startup
    * @param controllerPeriodicTasks
-   * @param controllerLeadershipManager
+   * @param leadControllerManager
    */
-  public void init(List<PeriodicTask> controllerPeriodicTasks, ControllerLeadershipManager controllerLeadershipManager) {
+  public void init(List<PeriodicTask> controllerPeriodicTasks, LeadControllerManager leadControllerManager) {
     super.init(controllerPeriodicTasks);
-    controllerLeadershipManager.subscribe(ControllerPeriodicTaskScheduler.class.getName(), this);
+    leadControllerManager.subscribe(ControllerPeriodicTaskScheduler.class.getName(), this);
   }
 
   @Override
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index c833261..0982c30 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -66,7 +66,7 @@ import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.common.utils.retry.RetryPolicies;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
@@ -123,13 +123,13 @@ public class PinotLLCRealtimeSegmentManager {
   private final TableConfigCache _tableConfigCache;
   private final StreamPartitionAssignmentGenerator _streamPartitionAssignmentGenerator;
   private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
-  private final ControllerLeadershipManager _controllerLeadershipManager;
+  private final LeadControllerManager _leadControllerManager;
 
   private volatile boolean _isStopping = false;
   private AtomicInteger _numCompletingSegments = new AtomicInteger(0);
 
   public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
-      ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager) {
+      ControllerMetrics controllerMetrics, LeadControllerManager leadControllerManager) {
     _helixAdmin = helixResourceManager.getHelixAdmin();
     _helixManager = helixResourceManager.getHelixZkManager();
     _propertyStore = helixResourceManager.getPropertyStore();
@@ -145,7 +145,7 @@ public class PinotLLCRealtimeSegmentManager {
     _tableConfigCache = new TableConfigCache(_propertyStore);
     _streamPartitionAssignmentGenerator = new StreamPartitionAssignmentGenerator(_helixManager);
     _flushThresholdUpdateManager = new FlushThresholdUpdateManager();
-    _controllerLeadershipManager = controllerLeadershipManager;
+    _leadControllerManager = leadControllerManager;
   }
 
 
@@ -182,8 +182,8 @@ public class PinotLLCRealtimeSegmentManager {
     LOGGER.info("Wait completed: Number of completing segments = {}", _numCompletingSegments.get());
   }
 
-  protected boolean isLeader() {
-    return _controllerLeadershipManager.isLeader();
+  protected boolean isLeader(String tableName) {
+    return _leadControllerManager.isLeaderForTable(tableName);
   }
 
   protected boolean isConnected() {
@@ -341,10 +341,10 @@ public class PinotLLCRealtimeSegmentManager {
     URI uriToMoveTo = ControllerConf.getUriFromPath(StringUtil.join("/", tableDirURI.toString(), segmentName));
     PinotFS pinotFS = PinotFSFactory.create(baseDirURI.getScheme());
 
-    if (!isConnected() || !isLeader()) {
+    if (!isConnected() || !isLeader(tableName)) {
       // We can potentially log a different value than what we saw ....
       LOGGER.warn("Lost leadership while committing segment file {}, {} for table {}: isLeader={}, isConnected={}",
-          segmentName, segmentLocation, tableName, isLeader(), isConnected());
+          segmentName, segmentLocation, tableName, isLeader(tableName), isConnected());
       _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
       return false;
     }
@@ -540,17 +540,17 @@ public class PinotLLCRealtimeSegmentManager {
     final String oldZnodePath =
         ZKMetadataProvider.constructPropertyStorePathForSegment(realtimeTableName, committingSegmentNameStr);
 
-    if (!isConnected() || !isLeader()) {
+    if (!isConnected() || !isLeader(realtimeTableName)) {
       // We can potentially log a different value than what we saw ....
       LOGGER.warn("Lost leadership while committing segment metadata for {} for table {}: isLeader={}, isConnected={}",
-          committingSegmentNameStr, realtimeTableName, isLeader(), isConnected());
+          committingSegmentNameStr, realtimeTableName, isLeader(realtimeTableName), isConnected());
       _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
       return false;
     }
     boolean success = writeSegmentToPropertyStore(oldZnodePath, oldZnRecord, realtimeTableName, stat.getVersion());
     if (!success) {
       LOGGER.warn("Fail to write old segment to property store for {} for table {}: isLeader={}, isConnected={}",
-          committingSegmentNameStr, realtimeTableName, isLeader(), isConnected());
+          committingSegmentNameStr, realtimeTableName, isLeader(realtimeTableName), isConnected());
     }
     return success;
   }
@@ -599,11 +599,11 @@ public class PinotLLCRealtimeSegmentManager {
         ZKMetadataProvider.constructPropertyStorePathForSegment(realtimeTableName, newSegmentNameStr);
 
     if (!isNewTableSetup) {
-      if (!isLeader() || !isConnected()) {
+      if (!isLeader(realtimeTableName) || !isConnected()) {
         // We can potentially log a different value than what we saw ....
         LOGGER.warn(
             "Lost leadership while committing new segment metadata for {} for table {}: isLeader={}, isConnected={}",
-            newSegmentNameStr, rawTableName, isLeader(), isConnected());
+            newSegmentNameStr, rawTableName, isLeader(realtimeTableName), isConnected());
         _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
         return false;
       }
@@ -612,7 +612,7 @@ public class PinotLLCRealtimeSegmentManager {
     boolean success = writeSegmentToPropertyStore(newZnodePath, newZnRecord, realtimeTableName);
     if (!success) {
       LOGGER.warn("Fail to write new segment to property store for {} for table {}: isLeader={}, isConnected={}",
-          newSegmentNameStr, rawTableName, isLeader(), isConnected());
+          newSegmentNameStr, rawTableName, isLeader(realtimeTableName), isConnected());
     }
     return success;
   }
@@ -1347,8 +1347,4 @@ public class PinotLLCRealtimeSegmentManager {
 
     return idealState;
   }
-
-  public ControllerLeadershipManager getControllerLeadershipManager() {
-    return _controllerLeadershipManager;
-  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index a4da6b4..8935c6b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.controller.helix.core.realtime;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -29,7 +28,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.helix.HelixManager;
-import org.apache.helix.ZNRecord;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerMeter;
@@ -37,12 +35,13 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.common.utils.SegmentName.SEPARATOR;
+
 
 /**
  * This is a singleton class in the controller that drives the state machines for segments that are in the
@@ -73,7 +72,7 @@ public class SegmentCompletionManager {
   private final Map<String, Long> _commitTimeMap = new ConcurrentHashMap<>();
   private final PinotLLCRealtimeSegmentManager _segmentManager;
   private final ControllerMetrics _controllerMetrics;
-  private final ControllerLeadershipManager _controllerLeadershipManager;
+  private final LeadControllerManager _leadControllerManager;
   private final Lock[] _fsmLocks;
   private static final int NUM_FSM_LOCKS = 20;
 
@@ -87,12 +86,12 @@ public class SegmentCompletionManager {
   // TODO keep some history of past committed segments so that we can avoid looking up PROPERTYSTORE if some server comes in late.
 
   public SegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager,
-      ControllerMetrics controllerMetrics, ControllerLeadershipManager controllerLeadershipManager,
+      ControllerMetrics controllerMetrics, LeadControllerManager leadControllerManager,
       int segmentCommitTimeoutSeconds) {
     _helixManager = helixManager;
     _segmentManager = segmentManager;
     _controllerMetrics = controllerMetrics;
-    _controllerLeadershipManager = controllerLeadershipManager;
+    _leadControllerManager = leadControllerManager;
     SegmentCompletionProtocol
         .setMaxSegmentCommitTimeMs(TimeUnit.MILLISECONDS.convert(segmentCommitTimeoutSeconds, TimeUnit.SECONDS));
     _fsmLocks = new Lock[NUM_FSM_LOCKS];
@@ -163,11 +162,12 @@ public class SegmentCompletionManager {
    * that it currently has (i.e. next offset that it will consume, if it continues to consume).
    */
   public SegmentCompletionProtocol.Response segmentConsumed(SegmentCompletionProtocol.Request.Params reqParams) {
-    if (!isLeader() || !_helixManager.isConnected()) {
+    final String segmentNameStr = reqParams.getSegmentName();
+    final String tableName = segmentNameStr.split(SEPARATOR)[0];
+    if (!isLeader(tableName) || !_helixManager.isConnected()) {
       _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
-    final String segmentNameStr = reqParams.getSegmentName();
     final String instanceId = reqParams.getInstanceId();
     final String stopReason = reqParams.getReason();
     final long offset = reqParams.getOffset();
@@ -201,11 +201,12 @@ public class SegmentCompletionManager {
    */
   public SegmentCompletionProtocol.Response segmentCommitStart(
       final SegmentCompletionProtocol.Request.Params reqParams) {
-    if (!isLeader() || !_helixManager.isConnected()) {
+    final String segmentNameStr = reqParams.getSegmentName();
+    final String tableName = segmentNameStr.split(SEPARATOR)[0];
+    if (!isLeader(tableName) || !_helixManager.isConnected()) {
       _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
-    final String segmentNameStr = reqParams.getSegmentName();
     final String instanceId = reqParams.getInstanceId();
     final long offset = reqParams.getOffset();
     LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
@@ -225,11 +226,12 @@ public class SegmentCompletionManager {
   }
 
   public SegmentCompletionProtocol.Response extendBuildTime(final SegmentCompletionProtocol.Request.Params reqParams) {
-    if (!isLeader() || !_helixManager.isConnected()) {
+    final String segmentNameStr = reqParams.getSegmentName();
+    final String tableName = segmentNameStr.split(SEPARATOR)[0];
+    if (!isLeader(tableName) || !_helixManager.isConnected()) {
       _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
-    final String segmentNameStr = reqParams.getSegmentName();
     final String instanceId = reqParams.getInstanceId();
     final long offset = reqParams.getOffset();
     final int extTimeSec = reqParams.getExtraTimeSec();
@@ -256,11 +258,12 @@ public class SegmentCompletionManager {
    */
   public SegmentCompletionProtocol.Response segmentStoppedConsuming(
       SegmentCompletionProtocol.Request.Params reqParams) {
-    if (!isLeader() || !_helixManager.isConnected()) {
+    final String segmentNameStr = reqParams.getSegmentName();
+    final String tableName = segmentNameStr.split(SEPARATOR)[0];
+    if (!isLeader(tableName) || !_helixManager.isConnected()) {
       _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
-    final String segmentNameStr = reqParams.getSegmentName();
     final String instanceId = reqParams.getInstanceId();
     final long offset = reqParams.getOffset();
     final String reason = reqParams.getReason();
@@ -292,11 +295,12 @@ public class SegmentCompletionManager {
    */
   public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams,
       boolean success, boolean isSplitCommit, CommittingSegmentDescriptor committingSegmentDescriptor) {
-    if (!isLeader() || !_helixManager.isConnected()) {
+    final String segmentNameStr = reqParams.getSegmentName();
+    final String tableName = segmentNameStr.split(SEPARATOR)[0];
+    if (!isLeader(tableName) || !_helixManager.isConnected()) {
       _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
-    final String segmentNameStr = reqParams.getSegmentName();
     LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
     SegmentCompletionFSM fsm = null;
     SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.RESP_FAILED;
@@ -352,6 +356,7 @@ public class SegmentCompletionManager {
     State _state = State.HOLDING;   // Typically start off in HOLDING state.
     final long _startTimeMs;
     private final LLCSegmentName _segmentName;
+    private final String _realtimeTableName;
     private final int _numReplicas;
     private final Set<String> _excludedServerStateMap;
     private final Map<String, Long> _commitStateMap;
@@ -394,6 +399,7 @@ public class SegmentCompletionManager {
     private SegmentCompletionFSM(PinotLLCRealtimeSegmentManager segmentManager,
         SegmentCompletionManager segmentCompletionManager, LLCSegmentName segmentName, int numReplicas) {
       _segmentName = segmentName;
+      _realtimeTableName = _segmentName.getTableName();
       _numReplicas = numReplicas;
       _segmentManager = segmentManager;
       _commitStateMap = new HashMap<>(_numReplicas);
@@ -403,8 +409,8 @@ public class SegmentCompletionManager {
       _maxTimeToPickWinnerMs = _startTimeMs + MAX_TIME_TO_PICK_WINNER_MS;
       _maxTimeToNotifyWinnerMs = _startTimeMs + MAX_TIME_TO_NOTIFY_WINNER_MS;
       long initialCommitTimeMs =
-          MAX_TIME_TO_NOTIFY_WINNER_MS + _segmentManager.getCommitTimeoutMS(_segmentName.getTableName());
-      Long savedCommitTime = _segmentCompletionManager._commitTimeMap.get(segmentName.getTableName());
+          MAX_TIME_TO_NOTIFY_WINNER_MS + _segmentManager.getCommitTimeoutMS(_realtimeTableName);
+      Long savedCommitTime = _segmentCompletionManager._commitTimeMap.get(_realtimeTableName);
       if (savedCommitTime != null && savedCommitTime > initialCommitTimeMs) {
         initialCommitTimeMs = savedCommitTime;
       }
@@ -413,7 +419,7 @@ public class SegmentCompletionManager {
         // The table has a really high value configured for max commit time. Set it to a higher value than default
         // and go from there.
         LOGGER.info("Configured max commit time {}s too high for table {}, changing to {}s", initialCommitTimeMs / 1000,
-            segmentName.getTableName(), MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS);
+            _realtimeTableName, MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS);
         initialCommitTimeMs = MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS * 1000;
       }
       _initialCommitTimeMs = initialCommitTimeMs;
@@ -681,14 +687,14 @@ public class SegmentCompletionManager {
     private SegmentCompletionProtocol.Response abortAndReturnHold(long now, String instanceId, long offset) {
       _state = State.ABORTED;
       _segmentCompletionManager._controllerMetrics
-          .addMeteredTableValue(_segmentName.getTableName(), ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
+          .addMeteredTableValue(_realtimeTableName, ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
       return hold(instanceId, offset);
     }
 
     private SegmentCompletionProtocol.Response abortAndReturnFailed() {
       _state = State.ABORTED;
       _segmentCompletionManager._controllerMetrics
-          .addMeteredTableValue(_segmentName.getTableName(), ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
+          .addMeteredTableValue(_realtimeTableName, ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1);
       return SegmentCompletionProtocol.RESP_FAILED;
     }
 
@@ -1117,7 +1123,7 @@ public class SegmentCompletionManager {
   }
 
   @VisibleForTesting
-  protected boolean isLeader() {
-    return _controllerLeadershipManager.isLeader();
+  protected boolean isLeader(String tableName) {
+    return _leadControllerManager.isLeaderForTable(tableName);
   }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index ea5b05b..2c2c017 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -38,6 +38,7 @@ import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.common.utils.retry.RetryPolicies;
 import org.apache.pinot.common.utils.time.TimeUtils;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
 import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
@@ -55,10 +56,11 @@ import org.slf4j.LoggerFactory;
 public class RealtimeSegmentRelocator extends ControllerPeriodicTask<Void> {
   private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeSegmentRelocator.class);
 
-  public RealtimeSegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
-      ControllerMetrics controllerMetrics) {
+  public RealtimeSegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics) {
     super("RealtimeSegmentRelocator", getRunFrequencySeconds(config.getRealtimeSegmentRelocatorFrequency()),
-        config.getRealtimeSegmentRelocationInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
+        config.getRealtimeSegmentRelocationInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager,
+        controllerMetrics);
   }
 
   @Override
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 7d5182d..fecb4ac 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -35,6 +35,7 @@ import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
 import org.apache.pinot.common.utils.SegmentName;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import org.apache.pinot.controller.helix.core.retention.strategy.RetentionStrategy;
@@ -54,10 +55,11 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {
 
   private final int _deletedSegmentsRetentionInDays;
 
-  public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
-      ControllerMetrics controllerMetrics) {
+  public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics) {
     super("RetentionManager", config.getRetentionControllerFrequencyInSeconds(),
-        config.getRetentionManagerInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
+        config.getRetentionManagerInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager,
+        controllerMetrics);
     _deletedSegmentsRetentionInDays = config.getDeletedSegmentsRetentionInDays();
 
     LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}, deletedSegmentsRetentionInDays: {}",
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerChecker.java
new file mode 100644
index 0000000..69b56a3
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerChecker.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.statemodel;
+
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pinot.common.utils.CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE;
+
+
+public class LeadControllerChecker {
+  private static final Logger LOGGER = LoggerFactory.getLogger(LeadControllerChecker.class);
+
+  private Map<Integer, String> _partitionCache;
+
+  public LeadControllerChecker() {
+    _partitionCache = new ConcurrentHashMap<>();
+  }
+
+  public void addPartitionLeader(String partitionName) {
+    LOGGER.info("Add Partition: {} to LeadControllerChecker", partitionName);
+    int partitionIndex = Integer.parseInt(partitionName.substring(partitionName.lastIndexOf("_") + 1));
+    _partitionCache.putIfAbsent(partitionIndex, null);
+  }
+
+  public void removePartitionLeader(String partitionName) {
+    LOGGER.info("Remove Partition: {} from LeadControllerChecker", partitionName);
+    int partitionIndex = Integer.parseInt(partitionName.substring(partitionName.lastIndexOf("_") + 1));
+    _partitionCache.remove(partitionIndex);
+  }
+
+  public boolean isPartitionLeader(int partitionIndex) {
+    Preconditions.checkArgument(partitionIndex >= 0 && partitionIndex < NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE, "Invalid partition index: " + partitionIndex);
+    return _partitionCache.containsKey(partitionIndex);
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerResourceMasterSlaveStateModelFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerResourceMasterSlaveStateModelFactory.java
new file mode 100644
index 0000000..d063c58
--- /dev/null
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/statemodel/LeadControllerResourceMasterSlaveStateModelFactory.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.statemodel;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class LeadControllerResourceMasterSlaveStateModelFactory extends MasterSlaveStateModelFactory {
+  private static final Logger LOGGER = LoggerFactory.getLogger(LeadControllerResourceMasterSlaveStateModelFactory.class);
+
+  private final LeadControllerManager _leadControllerManager;
+
+  public LeadControllerResourceMasterSlaveStateModelFactory(LeadControllerManager leadControllerManager) {
+    super();
+    _leadControllerManager = leadControllerManager;
+  }
+
+  @Override
+  public StateModel createNewStateModel(String resourceName, String partitionName) {
+    MasterSlaveStateModel stateModel = new LeadControllerResourceMasterSlaveStateModel();
+    stateModel.setPartitionName(partitionName);
+    return stateModel;
+  }
+
+  public class LeadControllerResourceMasterSlaveStateModel extends MasterSlaveStateModel {
+    @Override
+    public void onBecomeSlaveFromMaster(Message message, NotificationContext context) {
+      super.onBecomeSlaveFromMaster(message, context);
+      String partitionName = message.getPartitionName();
+      _leadControllerManager.addPartitionLeader(partitionName);
+      _leadControllerManager.onLeadControllerChange();
+    }
+
+    @Override
+    public void onBecomeMasterFromSlave(Message message, NotificationContext context) {
+      super.onBecomeMasterFromSlave(message, context);
+      String partitionName = message.getPartitionName();
+      _leadControllerManager.removePartitionLeader(partitionName);
+      _leadControllerManager.onLeadControllerChange();
+    }
+  }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
index 5748d3c..a82a161 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
@@ -24,6 +24,7 @@ import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import org.slf4j.Logger;
@@ -37,9 +38,9 @@ public class BrokerResourceValidationManager extends ControllerPeriodicTask<Brok
   private static final Logger LOGGER = LoggerFactory.getLogger(BrokerResourceValidationManager.class);
 
   public BrokerResourceValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
-      ControllerMetrics controllerMetrics) {
+      LeadControllerManager leadControllerManager, ControllerMetrics controllerMetrics) {
     super("BrokerResourceValidationManager", config.getBrokerResourceValidationFrequencyInSeconds(),
-        config.getBrokerResourceValidationInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
+        config.getBrokerResourceValidationInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager, controllerMetrics);
   }
 
   @Override
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index 7f19395..0eeadc7 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -30,6 +30,7 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.metrics.ValidationMetrics;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import org.apache.pinot.controller.util.SegmentIntervalUtils;
@@ -50,9 +51,11 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask<Void>
   private final ValidationMetrics _validationMetrics;
 
   public OfflineSegmentIntervalChecker(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
-      ValidationMetrics validationMetrics, ControllerMetrics controllerMetrics) {
+      LeadControllerManager leadControllerManager, ValidationMetrics validationMetrics,
+      ControllerMetrics controllerMetrics) {
     super("OfflineSegmentIntervalChecker", config.getOfflineSegmentIntervalCheckerFrequencyInSeconds(),
-        config.getOfflineSegmentIntervalCheckerInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
+        config.getOfflineSegmentIntervalCheckerInitialDelayInSeconds(), pinotHelixResourceManager,
+        leadControllerManager, controllerMetrics);
     _validationMetrics = validationMetrics;
   }
 
@@ -88,12 +91,12 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask<Void>
         if (SegmentIntervalUtils.isValidInterval(timeInterval)) {
           segmentIntervals.add(timeInterval);
         } else {
-          numSegmentsWithInvalidIntervals ++;
+          numSegmentsWithInvalidIntervals++;
         }
       }
       if (numSegmentsWithInvalidIntervals > 0) {
-        LOGGER.warn("Table: {} has {} segments with invalid interval", offlineTableName,
-            numSegmentsWithInvalidIntervals);
+        LOGGER
+            .warn("Table: {} has {} segments with invalid interval", offlineTableName, numSegmentsWithInvalidIntervals);
       }
       Duration frequency = SegmentIntervalUtils.convertToDuration(validationConfig.getSegmentPushFrequency());
       numMissingSegments = computeNumMissingSegments(segmentIntervals, frequency);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 5eb5a6f..88ad642 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -32,6 +32,7 @@ import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.HLCSegmentName;
 import org.apache.pinot.common.utils.SegmentName;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
@@ -53,10 +54,11 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea
   private long _lastUpdateRealtimeDocumentCountTimeMs = 0L;
 
   public RealtimeSegmentValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
-      PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, ValidationMetrics validationMetrics,
-      ControllerMetrics controllerMetrics) {
+      LeadControllerManager leadControllerManager, PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
+      ValidationMetrics validationMetrics, ControllerMetrics controllerMetrics) {
     super("RealtimeSegmentValidationManager", config.getRealtimeSegmentValidationFrequencyInSeconds(),
-        config.getRealtimeSegmentValidationManagerInitialDelaySeconds(), pinotHelixResourceManager, controllerMetrics);
+        config.getRealtimeSegmentValidationManagerInitialDelaySeconds(), pinotHelixResourceManager,
+        leadControllerManager, controllerMetrics);
     _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
     _validationMetrics = validationMetrics;
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
index db23301..c78c756 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/StorageQuotaChecker.java
@@ -30,7 +30,7 @@ import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.utils.DataSize;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.util.TableSizeReader;
 import org.slf4j.Logger;
@@ -48,16 +48,16 @@ public class StorageQuotaChecker {
   private final TableConfig _tableConfig;
   private final ControllerMetrics _controllerMetrics;
   private final PinotHelixResourceManager _pinotHelixResourceManager;
-  private final ControllerLeadershipManager _controllerLeadershipManager;
+  private final LeadControllerManager _leadControllerManager;
 
   public StorageQuotaChecker(TableConfig tableConfig, TableSizeReader tableSizeReader,
       ControllerMetrics controllerMetrics, PinotHelixResourceManager pinotHelixResourceManager,
-      ControllerLeadershipManager controllerLeadershipManager) {
+      LeadControllerManager leadControllerManager) {
     _tableConfig = tableConfig;
     _tableSizeReader = tableSizeReader;
     _controllerMetrics = controllerMetrics;
     _pinotHelixResourceManager = pinotHelixResourceManager;
-    _controllerLeadershipManager = controllerLeadershipManager;
+    _leadControllerManager = leadControllerManager;
   }
 
   public static class QuotaCheckerResponse {
@@ -157,7 +157,7 @@ public class StorageQuotaChecker {
         tableNameWithType, tableSubtypeSize.estimatedSizeInBytes, tableSubtypeSize.reportedSizeInBytes);
 
     // Only emit the real percentage of storage quota usage by lead controller, otherwise emit 0L.
-    if (isLeader() && allowedStorageBytes != 0L) {
+    if (isLeader(tableNameWithType) && allowedStorageBytes != 0L) {
       long existingStorageQuotaUtilization = tableSubtypeSize.estimatedSizeInBytes * 100 / allowedStorageBytes;
       _controllerMetrics.setValueOfTableGauge(tableNameWithType, ControllerGauge.TABLE_STORAGE_QUOTA_UTILIZATION,
           existingStorageQuotaUtilization);
@@ -213,7 +213,7 @@ public class StorageQuotaChecker {
     }
   }
 
-  protected boolean isLeader() {
-    return _controllerLeadershipManager.isLeader();
+  protected boolean isLeader(String tableName) {
+    return _leadControllerManager.isLeaderForTable(tableName);
   }
 }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
index c235483..066040e 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -35,10 +35,12 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -46,6 +48,7 @@ import static org.mockito.Mockito.when;
 public class SegmentStatusCheckerTest {
   private SegmentStatusChecker segmentStatusChecker;
   private PinotHelixResourceManager helixResourceManager;
+  private LeadControllerManager leadControllerManager;
   private MetricsRegistry metricsRegistry;
   private ControllerMetrics controllerMetrics;
   private ControllerConf config;
@@ -84,9 +87,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(
@@ -146,9 +154,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(
@@ -222,9 +235,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(0);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(
@@ -264,9 +282,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
@@ -291,9 +314,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE),
@@ -349,9 +377,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(
@@ -390,9 +423,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
@@ -429,9 +467,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
     // verify state before test
     Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT), 0);
     // update metrics
@@ -463,9 +506,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
     // verify state before test
     Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT), 0);
     // update metrics
@@ -508,9 +556,14 @@ public class SegmentStatusCheckerTest {
       when(config.getStatusCheckerFrequencyInSeconds()).thenReturn(300);
       when(config.getStatusCheckerWaitForPushTimeInSeconds()).thenReturn(300);
     }
+    {
+      leadControllerManager = mock(LeadControllerManager.class);
+      when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker =
+        new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
     segmentStatusChecker.start();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE),
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index a928922..dcd5469 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -27,10 +27,12 @@ import java.util.stream.IntStream;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
@@ -43,6 +45,7 @@ public class ControllerPeriodicTaskTest {
   private final ControllerConf _controllerConf = new ControllerConf();
 
   private final PinotHelixResourceManager _resourceManager = mock(PinotHelixResourceManager.class);
+  private final LeadControllerManager _leadControllerManager = mock(LeadControllerManager.class);
   private final ControllerMetrics _controllerMetrics = new ControllerMetrics(new MetricsRegistry());
   private final AtomicBoolean _startTaskCalled = new AtomicBoolean();
   private final AtomicBoolean _stopTaskCalled = new AtomicBoolean();
@@ -52,7 +55,8 @@ public class ControllerPeriodicTaskTest {
   private static final String TASK_NAME = "TestTask";
 
   private final ControllerPeriodicTask _task = new ControllerPeriodicTask<Void>(TASK_NAME, RUN_FREQUENCY_IN_SECONDS,
-      _controllerConf.getPeriodicTaskInitialDelayInSeconds(), _resourceManager, _controllerMetrics) {
+      _controllerConf.getPeriodicTaskInitialDelayInSeconds(), _resourceManager, _leadControllerManager,
+      _controllerMetrics) {
 
     @Override
     protected void setUpTask() {
@@ -81,6 +85,7 @@ public class ControllerPeriodicTaskTest {
     List<String> tables = new ArrayList<>(_numTables);
     IntStream.range(0, _numTables).forEach(i -> tables.add("table_" + i + " _OFFLINE"));
     when(_resourceManager.getAllTables()).thenReturn(tables);
+    when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
   }
 
   private void resetState() {
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 04e2b7b..af35420 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -56,7 +56,7 @@ import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.api.resources.LLCSegmentCompletionHandlers;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
@@ -66,7 +66,6 @@ import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.core.realtime.stream.OffsetCriteria;
 import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
 import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
 import org.apache.pinot.filesystem.PinotFSFactory;
 import org.apache.zookeeper.data.Stat;
@@ -1323,7 +1322,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     protected FakePinotLLCRealtimeSegmentManager(PinotHelixResourceManager pinotHelixResourceManager,
         List<String> existingLLCSegments, ControllerMetrics controllerMetrics) {
       super(pinotHelixResourceManager, CONTROLLER_CONF, controllerMetrics,
-          new ControllerLeadershipManager(pinotHelixResourceManager.getHelixZkManager(), controllerMetrics));
+          new LeadControllerManager());
 
       try {
         TableConfigCache mockCache = mock(TableConfigCache.class);
@@ -1513,7 +1512,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     }
 
     @Override
-    protected boolean isLeader() {
+    protected boolean isLeader(String tableName) {
       return IS_LEADER;
     }
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index b05f9bf..22e317f 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -30,7 +30,7 @@ import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
 import org.apache.zookeeper.data.Stat;
@@ -40,7 +40,6 @@ import org.testng.annotations.Test;
 
 import static org.apache.pinot.common.protocols.SegmentCompletionProtocol.ControllerResponseStatus;
 import static org.apache.pinot.common.protocols.SegmentCompletionProtocol.Request;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -1151,8 +1150,7 @@ public class SegmentCompletionTest {
 
     protected MockPinotLLCRealtimeSegmentManager(PinotHelixResourceManager pinotHelixResourceManager,
         ControllerMetrics controllerMetrics) {
-      super(pinotHelixResourceManager, CONTROLLER_CONF, controllerMetrics,
-          new ControllerLeadershipManager(pinotHelixResourceManager.getHelixZkManager(), controllerMetrics));
+      super(pinotHelixResourceManager, CONTROLLER_CONF, controllerMetrics, new LeadControllerManager());
     }
 
     @Override
@@ -1210,7 +1208,7 @@ public class SegmentCompletionTest {
     protected MockSegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager,
         boolean isLeader, ControllerMetrics controllerMetrics) {
       super(helixManager, segmentManager, controllerMetrics,
-          new ControllerLeadershipManager(helixManager, controllerMetrics),
+          new LeadControllerManager(),
           SegmentCompletionProtocol.getDefaultMaxSegmentCommitTimeSeconds());
       _isLeader = isLeader;
     }
@@ -1221,7 +1219,7 @@ public class SegmentCompletionTest {
     }
 
     @Override
-    protected boolean isLeader() {
+    protected boolean isLeader(String tableName) {
       return _isLeader;
     }
   }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
index 8c5c8ec..9316062 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
@@ -34,12 +34,14 @@ import org.apache.helix.model.builder.CustomModeISBuilder;
 import org.apache.pinot.common.config.RealtimeTagConfig;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -48,6 +50,7 @@ public class RealtimeSegmentRelocatorTest {
 
   private TestRealtimeSegmentRelocator _realtimeSegmentRelocator;
   private HelixManager _mockHelixManager;
+  private LeadControllerManager _leadControllerManager;
 
   private String[] serverNames;
   private String[] consumingServerNames;
@@ -69,10 +72,13 @@ public class RealtimeSegmentRelocatorTest {
     PinotHelixResourceManager mockPinotHelixResourceManager = mock(PinotHelixResourceManager.class);
     _mockHelixManager = mock(HelixManager.class);
     when(mockPinotHelixResourceManager.getHelixZkManager()).thenReturn(_mockHelixManager);
+    LeadControllerManager mockLeadControllerManager = mock(LeadControllerManager.class);
+    when(mockLeadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
     ControllerConf controllerConfig = new ControllerConf();
     ControllerMetrics controllerMetrics = new ControllerMetrics(new MetricsRegistry());
     _realtimeSegmentRelocator =
-        new TestRealtimeSegmentRelocator(mockPinotHelixResourceManager, controllerConfig, controllerMetrics);
+        new TestRealtimeSegmentRelocator(mockPinotHelixResourceManager, mockLeadControllerManager, controllerConfig,
+            controllerMetrics);
 
     final int maxInstances = 20;
     serverNames = new String[maxInstances];
@@ -268,9 +274,9 @@ public class RealtimeSegmentRelocatorTest {
 
     private Map<String, List<String>> tagToInstances;
 
-    public TestRealtimeSegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
-        ControllerMetrics controllerMetrics) {
-      super(pinotHelixResourceManager, config, controllerMetrics);
+    public TestRealtimeSegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager,
+        LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics) {
+      super(pinotHelixResourceManager, leadControllerManager, config, controllerMetrics);
       tagToInstances = new HashedMap();
     }
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index d4adfe1..465a9af 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -35,6 +35,7 @@ import org.apache.pinot.common.segment.SegmentMetadata;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
 import org.apache.pinot.controller.helix.core.SegmentDeletionManager;
@@ -84,6 +85,9 @@ public class RetentionManagerTest {
     PinotHelixResourceManager pinotHelixResourceManager = mock(PinotHelixResourceManager.class);
     setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager);
 
+    LeadControllerManager leadControllerManager = mock(LeadControllerManager.class);
+    when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+
     when(pinotHelixResourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
     when(pinotHelixResourceManager.getOfflineSegmentMetadata(OFFLINE_TABLE_NAME)).thenReturn(metadataList);
 
@@ -91,7 +95,8 @@ public class RetentionManagerTest {
     ControllerMetrics controllerMetrics = new ControllerMetrics(new MetricsRegistry());
     conf.setRetentionControllerFrequencyInSeconds(0);
     conf.setDeletedSegmentsRetentionInDays(0);
-    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, conf, controllerMetrics);
+    RetentionManager retentionManager =
+        new RetentionManager(pinotHelixResourceManager, leadControllerManager, conf, controllerMetrics);
     retentionManager.start();
     retentionManager.run();
 
@@ -210,11 +215,14 @@ public class RetentionManagerTest {
         setupSegmentMetadata(tableConfig, now, initialNumSegments, removedSegments);
     setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager);
 
+    LeadControllerManager leadControllerManager = mock(LeadControllerManager.class);
+    when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+
     ControllerConf conf = new ControllerConf();
     ControllerMetrics controllerMetrics = new ControllerMetrics(new MetricsRegistry());
     conf.setRetentionControllerFrequencyInSeconds(0);
     conf.setDeletedSegmentsRetentionInDays(0);
-    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, conf, controllerMetrics);
+    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, leadControllerManager, conf, controllerMetrics);
     retentionManager.start();
     retentionManager.run();
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
index e84a947..f8fb6ca 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
@@ -30,7 +30,7 @@ import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
-import org.apache.pinot.controller.ControllerLeadershipManager;
+import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.util.TableSizeReader;
 import org.mockito.invocation.InvocationOnMock;
@@ -49,7 +49,7 @@ public class StorageQuotaCheckerTest {
   private TableConfig _tableConfig;
   private ControllerMetrics _controllerMetrics;
   private PinotHelixResourceManager _pinotHelixResourceManager;
-  private ControllerLeadershipManager _controllerLeadershipManager;
+  private LeadControllerManager _leadControllerManager;
   private QuotaConfig _quotaConfig;
   private SegmentsValidationAndRetentionConfig _validationConfig;
   private static final File TEST_DIR = new File(StorageQuotaCheckerTest.class.getName());
@@ -62,7 +62,7 @@ public class StorageQuotaCheckerTest {
     _controllerMetrics = new ControllerMetrics(new MetricsRegistry());
     _validationConfig = mock(SegmentsValidationAndRetentionConfig.class);
     _pinotHelixResourceManager = mock(PinotHelixResourceManager.class);
-    _controllerLeadershipManager = mock(ControllerLeadershipManager.class);
+    _leadControllerManager = mock(LeadControllerManager.class);
     when(_tableConfig.getValidationConfig()).thenReturn(_validationConfig);
     when(_validationConfig.getReplicationNumber()).thenReturn(2);
     TEST_DIR.mkdirs();
@@ -78,10 +78,9 @@ public class StorageQuotaCheckerTest {
       throws InvalidConfigException {
     StorageQuotaChecker checker =
         new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager,
-            _controllerLeadershipManager);
+            _leadControllerManager);
     when(_tableConfig.getQuotaConfig()).thenReturn(null);
-    StorageQuotaChecker.QuotaCheckerResponse res =
-        checker.isSegmentStorageWithinQuota(TEST_DIR, "segment", 1000);
+    StorageQuotaChecker.QuotaCheckerResponse res = checker.isSegmentStorageWithinQuota(TEST_DIR, "segment", 1000);
     Assert.assertTrue(res.isSegmentWithinQuota);
   }
 
@@ -90,11 +89,10 @@ public class StorageQuotaCheckerTest {
       throws InvalidConfigException {
     StorageQuotaChecker checker =
         new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager,
-            _controllerLeadershipManager);
+            _leadControllerManager);
     when(_tableConfig.getQuotaConfig()).thenReturn(_quotaConfig);
     when(_quotaConfig.storageSizeBytes()).thenReturn(-1L);
-    StorageQuotaChecker.QuotaCheckerResponse res =
-        checker.isSegmentStorageWithinQuota(TEST_DIR, "segment", 1000);
+    StorageQuotaChecker.QuotaCheckerResponse res = checker.isSegmentStorageWithinQuota(TEST_DIR, "segment", 1000);
     Assert.assertTrue(res.isSegmentWithinQuota);
   }
 
@@ -134,9 +132,8 @@ public class StorageQuotaCheckerTest {
     when(_quotaConfig.getStorage()).thenReturn("3K");
     StorageQuotaChecker checker =
         new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager,
-            _controllerLeadershipManager);
-    StorageQuotaChecker.QuotaCheckerResponse response =
-        checker.isSegmentStorageWithinQuota(TEST_DIR, "segment1", 1000);
+            _leadControllerManager);
+    StorageQuotaChecker.QuotaCheckerResponse response = checker.isSegmentStorageWithinQuota(TEST_DIR, "segment1", 1000);
     Assert.assertTrue(response.isSegmentWithinQuota);
     Assert.assertEquals(
         _controllerMetrics.getValueOfTableGauge(tableName, ControllerGauge.TABLE_STORAGE_QUOTA_UTILIZATION), 80L);
@@ -184,12 +181,12 @@ public class StorageQuotaCheckerTest {
 
     public MockStorageQuotaChecker(TableConfig tableConfig, TableSizeReader tableSizeReader,
         ControllerMetrics controllerMetrics, PinotHelixResourceManager pinotHelixResourceManager,
-        ControllerLeadershipManager controllerLeadershipManager) {
-      super(tableConfig, tableSizeReader, controllerMetrics, pinotHelixResourceManager, controllerLeadershipManager);
+        LeadControllerManager leadControllerManager) {
+      super(tableConfig, tableSizeReader, controllerMetrics, pinotHelixResourceManager, leadControllerManager);
     }
 
     @Override
-    protected boolean isLeader() {
+    protected boolean isLeader(String tableName) {
       return true;
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org