You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2018/12/11 03:00:36 UTC

[incubator-pinot] branch run_validation_manager_on_leadership_change created (now b2a761a)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a change to branch run_validation_manager_on_leadership_change
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at b2a761a  Add ControllerLeadershipManager as single place to check controller leadership changes

This branch includes the following new commits:

     new b2a761a  Add ControllerLeadershipManager as single place to check controller leadership changes

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Add ControllerLeadershipManager as single place to check controller leadership changes

Posted by ne...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch run_validation_manager_on_leadership_change
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit b2a761a3c4aed4ac039fd59a0cb13becd191599d
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Mon Dec 10 19:00:06 2018 -0800

    Add ControllerLeadershipManager as single place to check controller leadership changes
---
 .../controller/ControllerChangeSubscriber.java     |  32 ++++++
 .../controller/ControllerLeadershipManager.java    | 116 +++++++++++++++++++++
 .../pinot/controller/ControllerStarter.java        |   4 +-
 .../helix/core/PinotHelixResourceManager.java      |   9 --
 .../core/periodictask/ControllerPeriodicTask.java  |   9 +-
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  23 +---
 .../core/realtime/PinotRealtimeSegmentManager.java |  30 +++---
 .../core/realtime/SegmentCompletionManager.java    |  17 ++-
 .../controller/validation/StorageQuotaChecker.java |   7 +-
 .../controller/helix/SegmentStatusCheckerTest.java |  43 ++++----
 .../periodictask/ControllerPeriodicTaskTest.java   |  33 +++++-
 .../helix/core/realtime/SegmentCompletionTest.java |   7 ++
 .../helix/core/retention/RetentionManagerTest.java |  18 +++-
 .../validation/StorageQuotaCheckerTest.java        |  20 +++-
 14 files changed, 287 insertions(+), 81 deletions(-)

diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerChangeSubscriber.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerChangeSubscriber.java
new file mode 100644
index 0000000..21e2945
--- /dev/null
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerChangeSubscriber.java
@@ -0,0 +1,32 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.controller;
+
+/**
+ * Interface for a subscriber to the {@link ControllerLeadershipManager}
+ */
+public interface ControllerChangeSubscriber {
+
+  /**
+   * Callback to invoke on becoming leader
+   */
+  void onBecomingLeader();
+
+  /**
+   * Callback to invoke on losing leadership
+   */
+  void onBecomingNonLeader();
+}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerLeadershipManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerLeadershipManager.java
new file mode 100644
index 0000000..99a8a9a
--- /dev/null
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerLeadershipManager.java
@@ -0,0 +1,116 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.linkedin.pinot.controller;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.listeners.ControllerChangeListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Single place for listening on controller changes
+ * This should be created at controller startup and everyone who wants to listen to controller changes should subscribe
+ */
+public class ControllerLeadershipManager {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(ControllerLeadershipManager.class);
+
+  private static ControllerLeadershipManager INSTANCE = null;
+
+  private HelixManager _helixManager;
+  private volatile boolean _amILeader = false;
+
+  private Map<String, ControllerChangeSubscriber> _subscribers = new ConcurrentHashMap<>();
+
+  private ControllerLeadershipManager(HelixManager helixManager) {
+    _helixManager = helixManager;
+    _helixManager.addControllerListener((ControllerChangeListener) notificationContext -> onControllerChange());
+  }
+
+  /**
+   * Create an instance of ControllerLeadershipManager
+   * @param helixManager
+   */
+  public static synchronized void createInstance(HelixManager helixManager) {
+    if (INSTANCE != null) {
+      throw new RuntimeException("Instance of ControllerLeadershipManager already created");
+    }
+    INSTANCE = new ControllerLeadershipManager(helixManager);
+  }
+
+  /**
+   * Get the instance of ControllerLeadershipManager
+   * @return
+   */
+  public static ControllerLeadershipManager getInstance() {
+    if (INSTANCE == null) {
+      throw new RuntimeException("Instance of ControllerLeadershipManager not yet created");
+    }
+    return INSTANCE;
+  }
+
+  /**
+   * Callback on changes in the controller
+   */
+  protected void onControllerChange() {
+    if (_helixManager.isLeader()) {
+      if (!_amILeader) {
+        _amILeader = true;
+        LOGGER.info("Became leader");
+        onBecomingLeader();
+      } else {
+        LOGGER.info("Already leader. Duplicate notification");
+      }
+    } else {
+      _amILeader = false;
+      LOGGER.info("Lost leadership");
+      onBecomingNonLeader();
+    }
+  }
+
+  public boolean isLeader() {
+    return _amILeader;
+  }
+
+  private void onBecomingLeader() {
+    _subscribers.forEach((k, v) -> v.onBecomingLeader());
+  }
+
+  private void onBecomingNonLeader() {
+    _subscribers.forEach((k, v) -> v.onBecomingNonLeader());
+  }
+
+  /**
+   * Subscribe to changes in the controller leadership
+   * @param name
+   * @param subscriber
+   */
+  public void subscribe(String name, ControllerChangeSubscriber subscriber) {
+    _subscribers.put(name, subscriber);
+  }
+
+  /**
+   * Unsubscribe from changes in controller leadership
+   * @param name
+   */
+  public void unsubscribe(String name) {
+    _subscribers.remove(name);
+  }
+}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
index bf25cef..8f64b6b 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
@@ -161,13 +161,15 @@ public class ControllerStarter {
     _helixResourceManager.start();
     final HelixManager helixManager = _helixResourceManager.getHelixZkManager();
 
+    LOGGER.info("Creating ControllerLeadershipManager instance");
+    ControllerLeadershipManager.createInstance(helixManager);
+
     LOGGER.info("Starting task resource manager");
     _helixTaskResourceManager = new PinotHelixTaskResourceManager(new TaskDriver(helixManager));
 
     // Helix resource manager must be started in order to create PinotLLCRealtimeSegmentManager
     LOGGER.info("Starting realtime segment manager");
     PinotLLCRealtimeSegmentManager.create(_helixResourceManager, _config, _controllerMetrics);
-    PinotLLCRealtimeSegmentManager.getInstance().start();
     _realtimeSegmentsManager.start(_controllerMetrics);
 
     // Setting up periodic tasks
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
index c12f70e..f695e51 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -218,15 +218,6 @@ public class PinotHelixResourceManager {
   }
 
   /**
-   * Check whether the Helix manager is the leader.
-   *
-   * @return Whether the Helix manager is the leader
-   */
-  public boolean isLeader() {
-    return _helixZkManager.isLeader();
-  }
-
-  /**
    * Get the Helix admin.
    *
    * @return Helix admin
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 3d1ef6a..3b12c0f 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -15,6 +15,8 @@
  */
 package com.linkedin.pinot.controller.helix.core.periodictask;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.linkedin.pinot.controller.ControllerLeadershipManager;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import com.linkedin.pinot.core.periodictask.BasePeriodicTask;
 import java.util.List;
@@ -59,7 +61,7 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask {
 
   @Override
   public void run() {
-    if (!_pinotHelixResourceManager.isLeader()) {
+    if (!isLeader()) {
       skipLeaderTask();
     } else {
       List<String> allTableNames = _pinotHelixResourceManager.getAllTables();
@@ -109,4 +111,9 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask {
    * @param tables List of table names
    */
   public abstract void process(List<String> tables);
+
+  @VisibleForTesting
+  protected boolean isLeader() {
+    return ControllerLeadershipManager.getInstance().isLeader();
+  }
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index b1ff634..6eeaf8e 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -42,6 +42,7 @@ import com.linkedin.pinot.common.utils.TarGzCompressionUtils;
 import com.linkedin.pinot.common.utils.helix.HelixHelper;
 import com.linkedin.pinot.common.utils.retry.RetryPolicies;
 import com.linkedin.pinot.controller.ControllerConf;
+import com.linkedin.pinot.controller.ControllerLeadershipManager;
 import com.linkedin.pinot.controller.api.events.MetadataEventNotifierFactory;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import com.linkedin.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
@@ -119,7 +120,6 @@ public class PinotLLCRealtimeSegmentManager {
   private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   private final PinotHelixResourceManager _helixResourceManager;
   private final String _clusterName;
-  private boolean _amILeader = false;
   private final ControllerConf _controllerConf;
   private final ControllerMetrics _controllerMetrics;
   private final int _numIdealStateUpdateLocks;
@@ -155,10 +155,6 @@ public class PinotLLCRealtimeSegmentManager {
     SegmentCompletionManager.create(helixManager, INSTANCE, controllerConf, controllerMetrics);
   }
 
-  public void start() {
-    _helixManager.addControllerListener(changeContext -> onBecomeLeader());
-  }
-
   protected PinotLLCRealtimeSegmentManager(HelixAdmin helixAdmin, String clusterName, HelixManager helixManager,
       ZkHelixPropertyStore propertyStore, PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
       ControllerMetrics controllerMetrics) {
@@ -186,24 +182,9 @@ public class PinotLLCRealtimeSegmentManager {
     return INSTANCE;
   }
 
-  private void onBecomeLeader() {
-    if (isLeader()) {
-      if (!_amILeader) {
-        // We were not leader before, now we are.
-        _amILeader = true;
-        LOGGER.info("Became leader");
-      } else {
-        // We already had leadership, nothing to do.
-        LOGGER.info("Already leader. Duplicate notification");
-      }
-    } else {
-      _amILeader = false;
-      LOGGER.info("Lost leadership");
-    }
-  }
 
   protected boolean isLeader() {
-    return _helixManager.isLeader();
+    return ControllerLeadershipManager.getInstance().isLeader();
   }
 
   protected boolean isConnected() {
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
index 07c411f..03958b9 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotRealtimeSegmentManager.java
@@ -22,10 +22,9 @@ import com.linkedin.pinot.common.config.TableNameBuilder;
 import com.linkedin.pinot.common.metadata.ZKMetadataProvider;
 import com.linkedin.pinot.common.metadata.instance.InstanceZKMetadata;
 import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
-import com.linkedin.pinot.common.utils.CommonConstants;
-import com.linkedin.pinot.core.realtime.stream.StreamConfig;
 import com.linkedin.pinot.common.metrics.ControllerMeter;
 import com.linkedin.pinot.common.metrics.ControllerMetrics;
+import com.linkedin.pinot.common.utils.CommonConstants;
 import com.linkedin.pinot.common.utils.CommonConstants.Helix.TableType;
 import com.linkedin.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
 import com.linkedin.pinot.common.utils.CommonConstants.Segment.SegmentType;
@@ -33,9 +32,12 @@ import com.linkedin.pinot.common.utils.HLCSegmentName;
 import com.linkedin.pinot.common.utils.SegmentName;
 import com.linkedin.pinot.common.utils.helix.HelixHelper;
 import com.linkedin.pinot.common.utils.retry.RetryPolicies;
+import com.linkedin.pinot.controller.ControllerChangeSubscriber;
+import com.linkedin.pinot.controller.ControllerLeadershipManager;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import com.linkedin.pinot.controller.helix.core.PinotTableIdealStateBuilder;
 import com.linkedin.pinot.core.query.utils.Pair;
+import com.linkedin.pinot.core.realtime.stream.StreamConfig;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -47,8 +49,6 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
-import org.apache.helix.ControllerChangeListener;
-import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.PropertyType;
 import org.apache.helix.ZNRecord;
@@ -65,7 +65,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Realtime segment manager, which assigns realtime segments to server instances so that they can consume from the stream.
  */
-public class PinotRealtimeSegmentManager implements HelixPropertyListener, IZkChildListener, IZkDataListener {
+public class PinotRealtimeSegmentManager implements HelixPropertyListener, IZkChildListener, IZkDataListener, ControllerChangeSubscriber {
   private static final Logger LOGGER = LoggerFactory.getLogger(PinotRealtimeSegmentManager.class);
   private static final String TABLE_CONFIG = "/CONFIGS/TABLE";
   private static final String SEGMENTS_PATH = "/SEGMENTS";
@@ -101,12 +101,7 @@ public class PinotRealtimeSegmentManager implements HelixPropertyListener, IZkCh
     _zkClient.subscribeDataChanges(_tableConfigPath, this);
 
     // Subscribe to leadership changes
-    _pinotHelixResourceManager.getHelixZkManager().addControllerListener(new ControllerChangeListener() {
-      @Override
-      public void onControllerChange(NotificationContext changeContext) {
-        processPropertyStoreChange(CONTROLLER_LEADER_CHANGE);
-      }
-    });
+    ControllerLeadershipManager.getInstance().subscribe(PinotLLCRealtimeSegmentManager.class.getName(), this);
 
     // Setup change listeners for already existing tables, if any.
     processPropertyStoreChange(_tableConfigPath);
@@ -115,6 +110,7 @@ public class PinotRealtimeSegmentManager implements HelixPropertyListener, IZkCh
   public void stop() {
     LOGGER.info("Stopping realtime segments manager, stopping property store.");
     _pinotHelixResourceManager.getPropertyStore().stop();
+    ControllerLeadershipManager.getInstance().unsubscribe(PinotRealtimeSegmentManager.class.getName());
   }
 
   private synchronized void assignRealtimeSegmentsToServerInstancesIfNecessary()
@@ -265,7 +261,7 @@ public class PinotRealtimeSegmentManager implements HelixPropertyListener, IZkCh
   }
 
   private boolean isLeader() {
-    return _pinotHelixResourceManager.isLeader();
+    return ControllerLeadershipManager.getInstance().isLeader();
   }
 
   @Override
@@ -408,4 +404,14 @@ public class PinotRealtimeSegmentManager implements HelixPropertyListener, IZkCh
     LOGGER.info("PinotRealtimeSegmentManager.handleDataDeleted: {}", dataPath);
     processPropertyStoreChange(dataPath);
   }
+
+  @Override
+  public void onBecomingLeader() {
+    processPropertyStoreChange(CONTROLLER_LEADER_CHANGE);
+  }
+
+  @Override
+  public void onBecomingNonLeader() {
+    processPropertyStoreChange(CONTROLLER_LEADER_CHANGE);
+  }
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index 1ba51bc..6d47ca7 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -15,6 +15,7 @@
  */
 package com.linkedin.pinot.controller.helix.core.realtime;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.linkedin.pinot.common.config.TableNameBuilder;
 import com.linkedin.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import com.linkedin.pinot.common.metrics.ControllerMeter;
@@ -23,6 +24,7 @@ import com.linkedin.pinot.common.protocols.SegmentCompletionProtocol;
 import com.linkedin.pinot.common.utils.CommonConstants;
 import com.linkedin.pinot.common.utils.LLCSegmentName;
 import com.linkedin.pinot.controller.ControllerConf;
+import com.linkedin.pinot.controller.ControllerLeadershipManager;
 import com.linkedin.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -153,7 +155,7 @@ public class SegmentCompletionManager {
    * that it currently has (i.e. next offset that it will consume, if it continues to consume).
    */
   public SegmentCompletionProtocol.Response segmentConsumed(SegmentCompletionProtocol.Request.Params reqParams) {
-    if (!_helixManager.isLeader() || !_helixManager.isConnected()) {
+    if (!isLeader() || !_helixManager.isConnected()) {
       _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
@@ -190,7 +192,7 @@ public class SegmentCompletionManager {
    * incoming segment).
    */
   public SegmentCompletionProtocol.Response segmentCommitStart(final SegmentCompletionProtocol.Request.Params reqParams) {
-    if (!_helixManager.isLeader() || !_helixManager.isConnected()) {
+    if (!isLeader() || !_helixManager.isConnected()) {
       _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
@@ -214,7 +216,7 @@ public class SegmentCompletionManager {
   }
 
   public SegmentCompletionProtocol.Response extendBuildTime(final SegmentCompletionProtocol.Request.Params reqParams) {
-    if (!_helixManager.isLeader() || !_helixManager.isConnected()) {
+    if (!isLeader() || !_helixManager.isConnected()) {
       _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
@@ -245,7 +247,7 @@ public class SegmentCompletionManager {
    * @return
    */
   public SegmentCompletionProtocol.Response segmentStoppedConsuming(SegmentCompletionProtocol.Request.Params reqParams) {
-    if (!_helixManager.isLeader() || !_helixManager.isConnected()) {
+    if (!isLeader() || !_helixManager.isConnected()) {
       _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
@@ -280,7 +282,7 @@ public class SegmentCompletionManager {
    * @return
    */
   public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams, boolean success, boolean isSplitCommit) {
-    if (!_helixManager.isLeader() || !_helixManager.isConnected()) {
+    if (!isLeader() || !_helixManager.isConnected()) {
       _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_NOT_LEADER, 1L);
       return SegmentCompletionProtocol.RESP_NOT_LEADER;
     }
@@ -1091,4 +1093,9 @@ public class SegmentCompletionManager {
       return false;
     }
   }
+
+  @VisibleForTesting
+  protected boolean isLeader() {
+    return ControllerLeadershipManager.getInstance().isLeader();
+  }
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/StorageQuotaChecker.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/StorageQuotaChecker.java
index 7d6be94..2614c2d 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/StorageQuotaChecker.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/StorageQuotaChecker.java
@@ -23,6 +23,7 @@ import com.linkedin.pinot.common.exception.InvalidConfigException;
 import com.linkedin.pinot.common.metrics.ControllerGauge;
 import com.linkedin.pinot.common.metrics.ControllerMetrics;
 import com.linkedin.pinot.common.utils.DataSize;
+import com.linkedin.pinot.controller.ControllerLeadershipManager;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import com.linkedin.pinot.controller.util.TableSizeReader;
 import java.io.File;
@@ -150,7 +151,7 @@ public class StorageQuotaChecker {
         tableName, tableSubtypeSize.estimatedSizeInBytes, tableSubtypeSize.reportedSizeInBytes);
 
     // Only emit the real percentage of storage quota usage by lead controller, otherwise emit 0L.
-    if (_pinotHelixResourceManager.isLeader() && allowedStorageBytes != 0L) {
+    if (isLeader() && allowedStorageBytes != 0L) {
       long existingStorageQuotaUtilization = tableSubtypeSize.estimatedSizeInBytes  * 100 / allowedStorageBytes;
       _controllerMetrics.setValueOfTableGauge(tableName, ControllerGauge.TABLE_STORAGE_QUOTA_UTILIZATION,
           existingStorageQuotaUtilization);
@@ -193,4 +194,8 @@ public class StorageQuotaChecker {
       return failure(message);
     }
   }
+
+  protected boolean isLeader() {
+    return ControllerLeadershipManager.getInstance().isLeader();
+  }
 }
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
index de6eca9..4d4c324 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -75,7 +75,6 @@ public class SegmentStatusCheckerTest {
     }
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(true);
       when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
       when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
       when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -87,7 +86,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -141,7 +140,6 @@ public class SegmentStatusCheckerTest {
     }
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(true);
       when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
       when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
       when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -153,7 +151,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -218,7 +216,6 @@ public class SegmentStatusCheckerTest {
     }
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(true);
       when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
       when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
       when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -231,7 +228,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -266,7 +263,6 @@ public class SegmentStatusCheckerTest {
     }
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(true);
       when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
       when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
       when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -278,7 +274,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -301,7 +297,6 @@ public class SegmentStatusCheckerTest {
     }
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(true);
       when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
       when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
       when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -313,7 +308,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -369,7 +364,6 @@ public class SegmentStatusCheckerTest {
 
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(true);
       when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
       when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
       when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -382,7 +376,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(externalView.getId(),
@@ -415,7 +409,6 @@ public class SegmentStatusCheckerTest {
     }
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(true);
       when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
       when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
       when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -427,7 +420,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -454,7 +447,6 @@ public class SegmentStatusCheckerTest {
     }
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(true);
       when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
       when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
       when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -466,7 +458,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -501,7 +493,6 @@ public class SegmentStatusCheckerTest {
     }
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(true);
       when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
       when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
       when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -513,7 +504,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     // verify state before test
     Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(
         ControllerGauge.DISABLED_TABLE_COUNT), 0);
@@ -553,7 +544,6 @@ public class SegmentStatusCheckerTest {
     }
     {
       helixResourceManager = mock(PinotHelixResourceManager.class);
-      when(helixResourceManager.isLeader()).thenReturn(true);
       when(helixResourceManager.getAllTables()).thenReturn(allTableNames);
       when(helixResourceManager.getHelixClusterName()).thenReturn("StatusChecker");
       when(helixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);
@@ -565,7 +555,7 @@ public class SegmentStatusCheckerTest {
     }
     metricsRegistry = new MetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
-    segmentStatusChecker = new SegmentStatusChecker(helixResourceManager, config, controllerMetrics);
+    segmentStatusChecker = new MockSegmentStatusChecker(helixResourceManager, config, controllerMetrics);
     segmentStatusChecker.init();
     segmentStatusChecker.run();
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
@@ -577,4 +567,17 @@ public class SegmentStatusCheckerTest {
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
         ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
   }
+
+  private class MockSegmentStatusChecker extends SegmentStatusChecker {
+
+    public MockSegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
+        ControllerMetrics metricsRegistry) {
+      super(pinotHelixResourceManager, config, metricsRegistry);
+    }
+
+    @Override
+    protected boolean isLeader() {
+      return true;
+    }
+  }
 }
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index 6a2c1b1..9a5d26c 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -32,8 +32,8 @@ public class ControllerPeriodicTaskTest {
   private final AtomicBoolean _onBecomeNonLeaderCalled = new AtomicBoolean();
   private final AtomicBoolean _processCalled = new AtomicBoolean();
 
-  private final ControllerPeriodicTask _task =
-      new ControllerPeriodicTask("TestTask", RUN_FREQUENCY_IN_SECONDS, _resourceManager) {
+  private final MockControllerPeriodicTask _task =
+      new MockControllerPeriodicTask("TestTask", RUN_FREQUENCY_IN_SECONDS, _resourceManager) {
         @Override
         public void onBecomeLeader() {
           _onBecomeLeaderCalled.set(true);
@@ -48,6 +48,7 @@ public class ControllerPeriodicTaskTest {
         public void process(List<String> tables) {
           _processCalled.set(true);
         }
+
       };
 
   private void resetState() {
@@ -68,6 +69,7 @@ public class ControllerPeriodicTaskTest {
   public void testChangeLeadership() {
     // Initial state
     resetState();
+    _task.setLeader(false);
     _task.init();
     assertFalse(_onBecomeLeaderCalled.get());
     assertFalse(_onBecomeNonLeaderCalled.get());
@@ -82,7 +84,7 @@ public class ControllerPeriodicTaskTest {
 
     // From non-leader to leader
     resetState();
-    when(_resourceManager.isLeader()).thenReturn(true);
+    _task.setLeader(true);
     _task.run();
     assertTrue(_onBecomeLeaderCalled.get());
     assertFalse(_onBecomeNonLeaderCalled.get());
@@ -97,10 +99,33 @@ public class ControllerPeriodicTaskTest {
 
     // From leader to non-leader
     resetState();
-    when(_resourceManager.isLeader()).thenReturn(false);
+    _task.setLeader(false);
     _task.run();
     assertFalse(_onBecomeLeaderCalled.get());
     assertTrue(_onBecomeNonLeaderCalled.get());
     assertFalse(_processCalled.get());
   }
+
+  private class MockControllerPeriodicTask extends ControllerPeriodicTask {
+
+    private boolean _isLeader = true;
+    public MockControllerPeriodicTask(String taskName, long runFrequencyInSeconds,
+        PinotHelixResourceManager pinotHelixResourceManager) {
+      super(taskName, runFrequencyInSeconds, pinotHelixResourceManager);
+    }
+
+    @Override
+    public void process(List<String> tables) {
+
+    }
+
+    @Override
+    protected boolean isLeader() {
+      return _isLeader;
+    }
+
+    void setLeader(boolean isLeader) {
+      _isLeader = isLeader;
+    }
+  }
 }
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index d867be3..310df33 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -1131,13 +1131,20 @@ public class SegmentCompletionTest {
 
   public static class MockSegmentCompletionManager extends SegmentCompletionManager {
     public long _secconds;
+    private boolean _isLeader;
     protected MockSegmentCompletionManager(PinotLLCRealtimeSegmentManager segmentManager, boolean isLeader,
         boolean isConnected) {
       super(createMockHelixManager(isLeader, isConnected), segmentManager, new ControllerMetrics(new MetricsRegistry()));
+      _isLeader = isLeader;
     }
     @Override
     protected long getCurrentTimeMs() {
       return _secconds * 1000L;
     }
+
+    @Override
+    protected boolean isLeader() {
+      return _isLeader;
+    }
   }
 }
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
index 1baf36f..b7c67a5 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -84,7 +84,7 @@ public class RetentionManagerTest {
     when(pinotHelixResourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
     when(pinotHelixResourceManager.getOfflineSegmentMetadata(OFFLINE_TABLE_NAME)).thenReturn(metadataList);
 
-    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, 0, 0);
+    RetentionManager retentionManager = new MockRetentionManager(pinotHelixResourceManager, 0, 0);
     retentionManager.init();
     retentionManager.run();
 
@@ -156,7 +156,6 @@ public class RetentionManagerTest {
   private void setupPinotHelixResourceManager(TableConfig tableConfig, final List<String> removedSegments,
       PinotHelixResourceManager resourceManager) {
     final String tableNameWithType = tableConfig.getTableName();
-    when(resourceManager.isLeader()).thenReturn(true);
     when(resourceManager.getAllTables()).thenReturn(Collections.singletonList(tableNameWithType));
 
     SegmentDeletionManager deletionManager = mock(SegmentDeletionManager.class);
@@ -202,7 +201,7 @@ public class RetentionManagerTest {
         setupSegmentMetadata(tableConfig, now, initialNumSegments, removedSegments);
     setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager);
 
-    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, 0, 0);
+    RetentionManager retentionManager = new MockRetentionManager(pinotHelixResourceManager, 0, 0);
     retentionManager.init();
     retentionManager.run();
 
@@ -306,4 +305,17 @@ public class RetentionManagerTest {
     when(segmentMetadata.getTimeGranularity()).thenReturn(new Duration(timeUnit.toMillis(1)));
     return segmentMetadata;
   }
+
+  private class MockRetentionManager extends RetentionManager {
+
+    public MockRetentionManager(PinotHelixResourceManager pinotHelixResourceManager, int runFrequencyInSeconds,
+        int deletedSegmentsRetentionInDays) {
+      super(pinotHelixResourceManager, runFrequencyInSeconds, deletedSegmentsRetentionInDays);
+    }
+
+    @Override
+    protected boolean isLeader() {
+      return true;
+    }
+  }
 }
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/StorageQuotaCheckerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/StorageQuotaCheckerTest.java
index e64b6ee..4ea6967 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/StorageQuotaCheckerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/StorageQuotaCheckerTest.java
@@ -58,7 +58,6 @@ public class StorageQuotaCheckerTest {
     _pinotHelixResourceManager = mock(PinotHelixResourceManager.class);
     when(_tableConfig.getValidationConfig()).thenReturn(_validationConfig);
     when(_validationConfig.getReplicationNumber()).thenReturn(2);
-    when(_pinotHelixResourceManager.isLeader()).thenReturn(true);
     TEST_DIR.mkdirs();
   }
 
@@ -69,7 +68,7 @@ public class StorageQuotaCheckerTest {
 
   @Test
   public void testNoQuota() throws InvalidConfigException {
-    StorageQuotaChecker checker = new StorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
+    StorageQuotaChecker checker = new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
     when(_tableConfig.getQuotaConfig()).thenReturn(null);
     StorageQuotaChecker.QuotaCheckerResponse res =
         checker.isSegmentStorageWithinQuota(TEST_DIR, "myTable", "segment", 1000);
@@ -78,7 +77,7 @@ public class StorageQuotaCheckerTest {
 
   @Test
   public void testNoStorageQuotaConfig() throws InvalidConfigException {
-    StorageQuotaChecker checker = new StorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
+    StorageQuotaChecker checker = new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
     when(_tableConfig.getQuotaConfig()).thenReturn(_quotaConfig);
     when(_quotaConfig.storageSizeBytes()).thenReturn(-1L);
     StorageQuotaChecker.QuotaCheckerResponse res =
@@ -118,7 +117,7 @@ public class StorageQuotaCheckerTest {
     when(_tableConfig.getQuotaConfig()).thenReturn(_quotaConfig);
     when(_quotaConfig.storageSizeBytes()).thenReturn(3000L);
     when(_quotaConfig.getStorage()).thenReturn("3K");
-    StorageQuotaChecker checker = new StorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
+    StorageQuotaChecker checker = new MockStorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, _pinotHelixResourceManager);
     StorageQuotaChecker.QuotaCheckerResponse response =
         checker.isSegmentStorageWithinQuota(TEST_DIR, tableName, "segment1", 1000);
     Assert.assertTrue(response.isSegmentWithinQuota);
@@ -160,4 +159,17 @@ public class StorageQuotaCheckerTest {
     response = checker.isSegmentStorageWithinQuota(TEST_DIR, tableName, "segment1", 1000);
     Assert.assertTrue(response.isSegmentWithinQuota);
   }
+
+  private class MockStorageQuotaChecker extends StorageQuotaChecker {
+
+    public MockStorageQuotaChecker(TableConfig tableConfig, TableSizeReader tableSizeReader,
+        ControllerMetrics controllerMetrics, PinotHelixResourceManager pinotHelixResourceManager) {
+      super(tableConfig, tableSizeReader, controllerMetrics, pinotHelixResourceManager);
+    }
+
+    @Override
+    protected boolean isLeader() {
+      return true;
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org