You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by mc...@apache.org on 2019/01/28 21:37:26 UTC

[incubator-pinot] branch master updated: Fix SegmentStatusCheckerIntegrationTest setup timings (#3749)

This is an automated email from the ASF dual-hosted git repository.

mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9004e38  Fix SegmentStatusCheckerIntegrationTest setup timings (#3749)
9004e38 is described below

commit 9004e3836651e650b13bda4e439b2f5fba6c5054
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Mon Jan 28 13:37:20 2019 -0800

    Fix SegmentStatusCheckerIntegrationTest setup timings (#3749)
    
    * Fix SegmentStatusCheckerIntegrationTest setup timings
    
    * Keep only 1 ControllerGauge value and use suffix for task name
    
    * Some more refactoring to keep counting of numTablesProcessed in base class of periodic tasks
---
 .../pinot/common/metrics/AbstractMetrics.java      |  55 +++--
 .../pinot/common/metrics/ControllerGauge.java      |   2 +
 .../apache/pinot/controller/ControllerStarter.java |  14 +-
 .../controller/helix/SegmentStatusChecker.java     | 237 ++++++++++-----------
 .../helix/core/minion/PinotTaskManager.java        |  13 +-
 .../core/periodictask/ControllerPeriodicTask.java  |  23 +-
 .../core/relocation/RealtimeSegmentRelocator.java  |  63 +++---
 .../helix/core/retention/RetentionManager.java     |  77 ++++---
 .../BrokerResourceValidationManager.java           |  32 +--
 .../validation/OfflineSegmentIntervalChecker.java  |  29 ++-
 .../RealtimeSegmentValidationManager.java          |  44 ++--
 .../periodictask/ControllerPeriodicTaskTest.java   |  42 ++--
 .../relocation/RealtimeSegmentRelocatorTest.java   |  11 +-
 .../helix/core/retention/RetentionManagerTest.java |   8 +-
 .../tasks/SegmentStatusCheckerIntegrationTest.java |  41 +++-
 15 files changed, 389 insertions(+), 302 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
index 6f78211..7857f57 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
@@ -327,23 +327,22 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e
     String gaugeName = gauge.getGaugeName();
     fullGaugeName = gaugeName + "." + getTableName(tableName);
 
-    if (!_gaugeValues.containsKey(fullGaugeName)) {
-      synchronized (_gaugeValues) {
-        if(!_gaugeValues.containsKey(fullGaugeName)) {
-          _gaugeValues.put(fullGaugeName, new AtomicLong(value));
-          addCallbackGauge(fullGaugeName, new Callable<Long>() {
-            @Override
-            public Long call() throws Exception {
-              return _gaugeValues.get(fullGaugeName).get();
-            }
-          });
-        } else {
-          _gaugeValues.get(fullGaugeName).set(value);
-        }
-      }
-    } else {
-      _gaugeValues.get(fullGaugeName).set(value);
-    }
+    setValueOfGauge(value, fullGaugeName);
+  }
+
+  /**
+   * Sets the value of a custom global gauge.
+   *
+   * @param suffix The suffix to attach to the gauge name
+   * @param gauge The gauge to use
+   * @param value The value to set the gauge to
+   */
+  public void setValueOfGlobalGauge(final G gauge, final String suffix, final long value) {
+    final String fullGaugeName;
+    String gaugeName = gauge.getGaugeName();
+    fullGaugeName = gaugeName + "." + suffix;
+
+    setValueOfGauge(value, fullGaugeName);
   }
 
   /**
@@ -355,16 +354,15 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e
   public void setValueOfGlobalGauge(final G gauge, final long value) {
     final String gaugeName = gauge.getGaugeName();
 
+    setValueOfGauge(value, gaugeName);
+  }
+
+  private void setValueOfGauge(long value, String gaugeName) {
     if (!_gaugeValues.containsKey(gaugeName)) {
       synchronized (_gaugeValues) {
         if(!_gaugeValues.containsKey(gaugeName)) {
           _gaugeValues.put(gaugeName, new AtomicLong(value));
-          addCallbackGauge(gaugeName, new Callable<Long>() {
-            @Override
-            public Long call() throws Exception {
-              return _gaugeValues.get(gaugeName).get();
-            }
-          });
+          addCallbackGauge(gaugeName, () -> _gaugeValues.get(gaugeName).get());
         } else {
           _gaugeValues.get(gaugeName).set(value);
         }
@@ -412,6 +410,17 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e
     }
   }
 
+
+  @VisibleForTesting
+  public long getValueOfGlobalGauge(final G gauge, String suffix) {
+    String fullGaugeName = gauge.getGaugeName() + "." + suffix;
+    if (!_gaugeValues.containsKey(fullGaugeName)) {
+      return 0;
+    } else {
+      return _gaugeValues.get(fullGaugeName).get();
+    }
+  }
+
   /**
    * Gets the value of a table gauge.
    *
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 38ba4df..0c18040 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -38,6 +38,8 @@ public enum ControllerGauge implements AbstractMetrics.Gauge {
   OFFLINE_TABLE_COUNT("TableCount", true),
   DISABLED_TABLE_COUNT("TableCount", true),
 
+  PERIODIC_TASK_NUM_TABLES_PROCESSED("PeriodicTaskNumTablesProcessed", true),
+
   SHORT_OF_LIVE_INSTANCES("ShortOfLiveInstances", false), // Number of extra live instances needed.
 
   REALTIME_TABLE_ESTIMATED_SIZE("RealtimeTableEstimatedSize", false), // Estimated size of realtime table.
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index ab0a7ad..6b0efc8 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -300,21 +300,21 @@ public class ControllerStarter {
     List<PeriodicTask> periodicTasks = new ArrayList<>();
     _taskManager = new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, _config, _controllerMetrics);
     periodicTasks.add(_taskManager);
-    _retentionManager = new RetentionManager(_helixResourceManager, _config);
+    _retentionManager = new RetentionManager(_helixResourceManager, _config, _controllerMetrics);
     periodicTasks.add(_retentionManager);
     _offlineSegmentIntervalChecker =
-        new OfflineSegmentIntervalChecker(_config, _helixResourceManager, new ValidationMetrics(_metricsRegistry));
+        new OfflineSegmentIntervalChecker(_config, _helixResourceManager, new ValidationMetrics(_metricsRegistry),
+            _controllerMetrics);
     periodicTasks.add(_offlineSegmentIntervalChecker);
-    _realtimeSegmentValidationManager =
-        new RealtimeSegmentValidationManager(_config, _helixResourceManager, PinotLLCRealtimeSegmentManager.getInstance(),
-            new ValidationMetrics(_metricsRegistry));
+    _realtimeSegmentValidationManager = new RealtimeSegmentValidationManager(_config, _helixResourceManager,
+        PinotLLCRealtimeSegmentManager.getInstance(), new ValidationMetrics(_metricsRegistry), _controllerMetrics);
     periodicTasks.add(_realtimeSegmentValidationManager);
     _brokerResourceValidationManager =
-        new BrokerResourceValidationManager(_config, _helixResourceManager);
+        new BrokerResourceValidationManager(_config, _helixResourceManager, _controllerMetrics);
     periodicTasks.add(_brokerResourceValidationManager);
     _segmentStatusChecker = new SegmentStatusChecker(_helixResourceManager, _config, _controllerMetrics);
     periodicTasks.add(_segmentStatusChecker);
-    _realtimeSegmentRelocator = new RealtimeSegmentRelocator(_helixResourceManager, _config);
+    _realtimeSegmentRelocator = new RealtimeSegmentRelocator(_helixResourceManager, _config, _controllerMetrics);
     periodicTasks.add(_realtimeSegmentRelocator);
 
     return periodicTasks;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index 3cb1ee4..84bf706 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -45,7 +45,6 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
   public static final String ONLINE = "ONLINE";
   public static final String ERROR = "ERROR";
   public static final String CONSUMING = "CONSUMING";
-  private final ControllerMetrics _metricsRegistry;
   private final int _waitForPushTimeSeconds;
 
   // log messages about disabled tables atmost once a day
@@ -63,12 +62,11 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
    * @param config The controller configuration object
    */
   public SegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
-      ControllerMetrics metricsRegistry) {
+      ControllerMetrics controllerMetrics) {
     super("SegmentStatusChecker", config.getStatusCheckerFrequencyInSeconds(),
-        config.getStatusCheckerInitialDelayInSeconds(), pinotHelixResourceManager);
+        config.getStatusCheckerInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
 
     _waitForPushTimeSeconds = config.getStatusCheckerWaitForPushTimeInSeconds();
-    _metricsRegistry = metricsRegistry;
   }
 
   @Override
@@ -99,6 +97,13 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
   }
 
   @Override
+  protected void exceptionHandler(String tableNameWithType, Exception e) {
+    LOGGER.error("Caught exception while updating segment status for table {}", tableNameWithType, e);
+    // Remove the metric for this table
+    resetTableMetrics(tableNameWithType);
+  }
+
+  @Override
   protected void postprocess() {
     _metricsRegistry.setValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT, _realTimeTableCount);
     _metricsRegistry.setValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT, _offlineTableCount);
@@ -113,138 +118,132 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
    */
   private void updateSegmentMetrics(String tableNameWithType) {
 
-    try {
-      if (TableNameBuilder.getTableTypeFromTableName(tableNameWithType) == TableType.OFFLINE) {
-        _offlineTableCount++;
-      } else {
-        _realTimeTableCount++;
-      }
+    if (TableNameBuilder.getTableTypeFromTableName(tableNameWithType) == TableType.OFFLINE) {
+      _offlineTableCount++;
+    } else {
+      _realTimeTableCount++;
+    }
 
-      IdealState idealState = _pinotHelixResourceManager.getTableIdealState(tableNameWithType);
+    IdealState idealState = _pinotHelixResourceManager.getTableIdealState(tableNameWithType);
 
-      if (idealState == null) {
-        LOGGER.warn("Table {} has null ideal state. Skipping segment status checks", tableNameWithType);
-        resetTableMetrics(tableNameWithType);
-        return;
-      }
+    if (idealState == null) {
+      LOGGER.warn("Table {} has null ideal state. Skipping segment status checks", tableNameWithType);
+      resetTableMetrics(tableNameWithType);
+      return;
+    }
 
-      if (!idealState.isEnabled()) {
-        if (_logDisabledTables) {
-          LOGGER.warn("Table {} is disabled. Skipping segment status checks", tableNameWithType);
-        }
-        resetTableMetrics(tableNameWithType);
-        _disabledTableCount++;
-        return;
+    if (!idealState.isEnabled()) {
+      if (_logDisabledTables) {
+        LOGGER.warn("Table {} is disabled. Skipping segment status checks", tableNameWithType);
       }
+      resetTableMetrics(tableNameWithType);
+      _disabledTableCount++;
+      return;
+    }
 
-      if (idealState.getPartitionSet().isEmpty()) {
-        int nReplicasFromIdealState = 1;
-        try {
-          nReplicasFromIdealState = Integer.valueOf(idealState.getReplicas());
-        } catch (NumberFormatException e) {
-          // Ignore
-        }
-        _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, nReplicasFromIdealState);
-        _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS, 100);
-        _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, 100);
-        return;
+    if (idealState.getPartitionSet().isEmpty()) {
+      int nReplicasFromIdealState = 1;
+      try {
+        nReplicasFromIdealState = Integer.valueOf(idealState.getReplicas());
+      } catch (NumberFormatException e) {
+        // Ignore
       }
+      _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS,
+          nReplicasFromIdealState);
+      _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS, 100);
+      _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, 100);
+      return;
+    }
 
-      _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.IDEALSTATE_ZNODE_SIZE,
-          idealState.toString().length());
-      _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENT_COUNT,
-          (long) (idealState.getPartitionSet().size()));
-      ExternalView externalView = _pinotHelixResourceManager.getTableExternalView(tableNameWithType);
+    _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.IDEALSTATE_ZNODE_SIZE,
+        idealState.toString().length());
+    _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENT_COUNT,
+        (long) (idealState.getPartitionSet().size()));
+    ExternalView externalView = _pinotHelixResourceManager.getTableExternalView(tableNameWithType);
 
-      int nReplicasIdealMax = 0; // Keeps track of maximum number of replicas in ideal state
-      int nReplicasExternal = -1; // Keeps track of minimum number of replicas in external view
-      int nErrors = 0; // Keeps track of number of segments in error state
-      int nOffline = 0; // Keeps track of number segments with no online replicas
-      int nSegments = 0; // Counts number of segments
-      for (String partitionName : idealState.getPartitionSet()) {
-        int nReplicas = 0;
-        int nIdeal = 0;
-        nSegments++;
-        // Skip segments not online in ideal state
-        for (Map.Entry<String, String> serverAndState : idealState.getInstanceStateMap(partitionName).entrySet()) {
-          if (serverAndState == null) {
-            break;
-          }
-          if (serverAndState.getValue().equals(ONLINE)) {
-            nIdeal++;
-            break;
-          }
+    int nReplicasIdealMax = 0; // Keeps track of maximum number of replicas in ideal state
+    int nReplicasExternal = -1; // Keeps track of minimum number of replicas in external view
+    int nErrors = 0; // Keeps track of number of segments in error state
+    int nOffline = 0; // Keeps track of number segments with no online replicas
+    int nSegments = 0; // Counts number of segments
+    for (String partitionName : idealState.getPartitionSet()) {
+      int nReplicas = 0;
+      int nIdeal = 0;
+      nSegments++;
+      // Skip segments not online in ideal state
+      for (Map.Entry<String, String> serverAndState : idealState.getInstanceStateMap(partitionName).entrySet()) {
+        if (serverAndState == null) {
+          break;
         }
-        if (nIdeal == 0) {
-          // No online segments in ideal state
-          continue;
+        if (serverAndState.getValue().equals(ONLINE)) {
+          nIdeal++;
+          break;
         }
-        nReplicasIdealMax =
-            (idealState.getInstanceStateMap(partitionName).size() > nReplicasIdealMax) ? idealState.getInstanceStateMap(
-                partitionName).size() : nReplicasIdealMax;
-        if ((externalView == null) || (externalView.getStateMap(partitionName) == null)) {
-          // No replicas for this segment
-          TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
-          if ((tableType != null) && (tableType.equals(TableType.OFFLINE))) {
-            OfflineSegmentZKMetadata segmentZKMetadata =
-                _pinotHelixResourceManager.getOfflineSegmentZKMetadata(tableNameWithType, partitionName);
+      }
+      if (nIdeal == 0) {
+        // No online segments in ideal state
+        continue;
+      }
+      nReplicasIdealMax =
+          (idealState.getInstanceStateMap(partitionName).size() > nReplicasIdealMax) ? idealState.getInstanceStateMap(
+              partitionName).size() : nReplicasIdealMax;
+      if ((externalView == null) || (externalView.getStateMap(partitionName) == null)) {
+        // No replicas for this segment
+        TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+        if ((tableType != null) && (tableType.equals(TableType.OFFLINE))) {
+          OfflineSegmentZKMetadata segmentZKMetadata =
+              _pinotHelixResourceManager.getOfflineSegmentZKMetadata(tableNameWithType, partitionName);
 
-            if (segmentZKMetadata != null
-                && segmentZKMetadata.getPushTime() > System.currentTimeMillis() - _waitForPushTimeSeconds * 1000) {
-              // push not yet finished, skip
-              continue;
-            }
-          }
-          nOffline++;
-          if (nOffline < MaxOfflineSegmentsToLog) {
-            LOGGER.warn("Segment {} of table {} has no replicas", partitionName, tableNameWithType);
-          }
-          nReplicasExternal = 0;
-          continue;
-        }
-        for (Map.Entry<String, String> serverAndState : externalView.getStateMap(partitionName).entrySet()) {
-          // Count number of online replicas. Ignore if state is CONSUMING.
-          // It is possible for a segment to be ONLINE in idealstate, and CONSUMING in EV for a short period of time.
-          // So, ignore this combination. If a segment exists in this combination for a long time, we will get
-          // low level-partition-not-consuming alert anyway.
-          if (serverAndState.getValue().equals(ONLINE) || serverAndState.getValue().equals(CONSUMING)) {
-            nReplicas++;
-          }
-          if (serverAndState.getValue().equals(ERROR)) {
-            nErrors++;
+          if (segmentZKMetadata != null
+              && segmentZKMetadata.getPushTime() > System.currentTimeMillis() - _waitForPushTimeSeconds * 1000) {
+            // push not yet finished, skip
+            continue;
           }
         }
-        if (nReplicas == 0) {
-          if (nOffline < MaxOfflineSegmentsToLog) {
-            LOGGER.warn("Segment {} of table {} has no online replicas", partitionName, tableNameWithType);
-          }
-          nOffline++;
+        nOffline++;
+        if (nOffline < MaxOfflineSegmentsToLog) {
+          LOGGER.warn("Segment {} of table {} has no replicas", partitionName, tableNameWithType);
         }
-        nReplicasExternal =
-            ((nReplicasExternal > nReplicas) || (nReplicasExternal == -1)) ? nReplicas : nReplicasExternal;
-      }
-      if (nReplicasExternal == -1) {
-        nReplicasExternal = (nReplicasIdealMax == 0) ? 1 : 0;
+        nReplicasExternal = 0;
+        continue;
       }
-      // Synchronization provided by Controller Gauge to make sure that only one thread updates the gauge
-      _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, nReplicasExternal);
-      _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS,
-          (nReplicasIdealMax > 0) ? (nReplicasExternal * 100 / nReplicasIdealMax) : 100);
-      _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE, nErrors);
-      _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE,
-          (nSegments > 0) ? (100 - (nOffline * 100 / nSegments)) : 100);
-      if (nOffline > 0) {
-        LOGGER.warn("Table {} has {} segments with no online replicas", tableNameWithType, nOffline);
+      for (Map.Entry<String, String> serverAndState : externalView.getStateMap(partitionName).entrySet()) {
+        // Count number of online replicas. Ignore if state is CONSUMING.
+        // It is possible for a segment to be ONLINE in idealstate, and CONSUMING in EV for a short period of time.
+        // So, ignore this combination. If a segment exists in this combination for a long time, we will get
+        // low level-partition-not-consuming alert anyway.
+        if (serverAndState.getValue().equals(ONLINE) || serverAndState.getValue().equals(CONSUMING)) {
+          nReplicas++;
+        }
+        if (serverAndState.getValue().equals(ERROR)) {
+          nErrors++;
+        }
       }
-      if (nReplicasExternal < nReplicasIdealMax) {
-        LOGGER.warn("Table {} has {} replicas, below replication threshold :{}", tableNameWithType, nReplicasExternal,
-            nReplicasIdealMax);
+      if (nReplicas == 0) {
+        if (nOffline < MaxOfflineSegmentsToLog) {
+          LOGGER.warn("Segment {} of table {} has no online replicas", partitionName, tableNameWithType);
+        }
+        nOffline++;
       }
-    } catch (Exception e) {
-      LOGGER.error("Caught exception while updating segment status for table {}", tableNameWithType, e);
-
-      // Remove the metric for this table
-      resetTableMetrics(tableNameWithType);
+      nReplicasExternal =
+          ((nReplicasExternal > nReplicas) || (nReplicasExternal == -1)) ? nReplicas : nReplicasExternal;
+    }
+    if (nReplicasExternal == -1) {
+      nReplicasExternal = (nReplicasIdealMax == 0) ? 1 : 0;
+    }
+    // Synchronization provided by Controller Gauge to make sure that only one thread updates the gauge
+    _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, nReplicasExternal);
+    _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS,
+        (nReplicasIdealMax > 0) ? (nReplicasExternal * 100 / nReplicasIdealMax) : 100);
+    _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE, nErrors);
+    _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE,
+        (nSegments > 0) ? (100 - (nOffline * 100 / nSegments)) : 100);
+    if (nOffline > 0) {
+      LOGGER.warn("Table {} has {} segments with no online replicas", tableNameWithType, nOffline);
+    }
+    if (nReplicasExternal < nReplicasIdealMax) {
+      LOGGER.warn("Table {} has {} replicas, below replication threshold :{}", tableNameWithType, nReplicasExternal,
+          nReplicasIdealMax);
     }
   }
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 2052f77..ed3bad6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -50,7 +50,6 @@ public class PinotTaskManager extends ControllerPeriodicTask {
   private final PinotHelixTaskResourceManager _helixTaskResourceManager;
   private final ClusterInfoProvider _clusterInfoProvider;
   private final TaskGeneratorRegistry _taskGeneratorRegistry;
-  private final ControllerMetrics _controllerMetrics;
 
   private Map<String, List<TableConfig>> _enabledTableConfigMap;
   private Set<String> _taskTypes;
@@ -61,11 +60,10 @@ public class PinotTaskManager extends ControllerPeriodicTask {
       @Nonnull PinotHelixResourceManager helixResourceManager, @Nonnull ControllerConf controllerConf,
       @Nonnull ControllerMetrics controllerMetrics) {
     super("PinotTaskManager", controllerConf.getTaskManagerFrequencyInSeconds(),
-        controllerConf.getPeriodicTaskInitialDelayInSeconds(), helixResourceManager);
+        controllerConf.getPeriodicTaskInitialDelayInSeconds(), helixResourceManager, controllerMetrics);
     _helixTaskResourceManager = helixTaskResourceManager;
     _clusterInfoProvider = new ClusterInfoProvider(helixResourceManager, helixTaskResourceManager, controllerConf);
     _taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoProvider);
-    _controllerMetrics = controllerMetrics;
   }
 
   @Override
@@ -104,7 +102,7 @@ public class PinotTaskManager extends ControllerPeriodicTask {
 
   @Override
   protected void preprocess() {
-    _controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L);
+    _metricsRegistry.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L);
 
     _taskTypes = _taskGeneratorRegistry.getAllTaskTypes();
     _numTaskTypes = _taskTypes.size();
@@ -134,6 +132,11 @@ public class PinotTaskManager extends ControllerPeriodicTask {
   }
 
   @Override
+  protected void exceptionHandler(String tableNameWithType, Exception e) {
+    LOGGER.error("Exception in PinotTaskManager for table {}", tableNameWithType, e);
+  }
+
+  @Override
   protected void postprocess() {
     // Generate each type of tasks
     _tasksScheduled = new HashMap<>(_numTaskTypes);
@@ -147,7 +150,7 @@ public class PinotTaskManager extends ControllerPeriodicTask {
             pinotTaskConfigs);
         _tasksScheduled.put(taskType, _helixTaskResourceManager.submitTask(pinotTaskConfigs,
             pinotTaskGenerator.getNumConcurrentTasksPerInstance()));
-        _controllerMetrics.addMeteredTableValue(taskType, ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
+        _metricsRegistry.addMeteredTableValue(taskType, ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
       }
     }
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 40cd982..e7ddbb3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -20,6 +20,8 @@ package org.apache.pinot.controller.helix.core.periodictask;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.util.List;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.core.periodictask.BasePeriodicTask;
 import org.slf4j.Logger;
@@ -33,18 +35,19 @@ import org.slf4j.LoggerFactory;
 public abstract class ControllerPeriodicTask extends BasePeriodicTask {
   private static final Logger LOGGER = LoggerFactory.getLogger(ControllerPeriodicTask.class);
 
-
   private static final long MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS = 30_000L;
 
   protected final PinotHelixResourceManager _pinotHelixResourceManager;
+  protected final ControllerMetrics _metricsRegistry;
 
   private volatile boolean _stopPeriodicTask;
   private volatile boolean _periodicTaskInProgress;
 
   public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds, long initialDelayInSeconds,
-      PinotHelixResourceManager pinotHelixResourceManager) {
+      PinotHelixResourceManager pinotHelixResourceManager, ControllerMetrics controllerMetrics) {
     super(taskName, runFrequencyInSeconds, initialDelayInSeconds);
     _pinotHelixResourceManager = pinotHelixResourceManager;
+    _metricsRegistry = controllerMetrics;
   }
 
   /**
@@ -118,16 +121,28 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask {
    */
   protected void process(List<String> tableNamesWithType) {
     if (!shouldStopPeriodicTask()) {
+
+      int numTablesProcessed = 0;
       preprocess();
+
       for (String tableNameWithType : tableNamesWithType) {
         if (shouldStopPeriodicTask()) {
           LOGGER.info("Skip processing table {} and all the remaining tables for task {}.", tableNameWithType,
               getTaskName());
           break;
         }
-        processTable(tableNameWithType);
+        try {
+          processTable(tableNameWithType);
+          numTablesProcessed++;
+        } catch (Exception e) {
+          exceptionHandler(tableNameWithType, e);
+        }
       }
+
       postprocess();
+      _metricsRegistry.setValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, getTaskName(),
+          numTablesProcessed);
+
     } else {
       LOGGER.info("Skip processing all tables for task {}", getTaskName());
     }
@@ -149,6 +164,8 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask {
    */
   protected abstract void postprocess();
 
+  protected abstract void exceptionHandler(String tableNameWithType, Exception e);
+
   @VisibleForTesting
   protected boolean shouldStopPeriodicTask() {
     return _stopPeriodicTask;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index a5a81de..5cbcb48 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -33,6 +33,8 @@ import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.config.RealtimeTagConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableNameBuilder;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.common.utils.retry.RetryPolicies;
 import org.apache.pinot.common.utils.time.TimeUtils;
@@ -54,9 +56,10 @@ import org.slf4j.LoggerFactory;
 public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
   private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeSegmentRelocator.class);
 
-  public RealtimeSegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config) {
+  public RealtimeSegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
+      ControllerMetrics controllerMetrics) {
     super("RealtimeSegmentRelocator", getRunFrequencySeconds(config.getRealtimeSegmentRelocatorFrequency()),
-        config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager);
+        config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
   }
 
   @Override
@@ -66,17 +69,23 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
 
   @Override
   protected void preprocess() {
-
   }
 
   @Override
   protected void processTable(String tableNameWithType) {
-    runRelocation(tableNameWithType);
+    CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    if (tableType == CommonConstants.Helix.TableType.REALTIME) {
+      runRelocation(tableNameWithType);
+    }
   }
 
   @Override
   protected void postprocess() {
+  }
 
+  @Override
+  protected void exceptionHandler(String tableNameWithType, Exception e) {
+    LOGGER.error("Exception in relocating realtime segments of table {}", tableNameWithType, e);
   }
 
   /**
@@ -87,38 +96,30 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
    * @param tableNameWithType
    */
   private void runRelocation(String tableNameWithType) {
-    // Only consider realtime tables.
-    if (!TableNameBuilder.REALTIME.tableHasTypeSuffix(tableNameWithType)) {
+    LOGGER.info("Starting relocation of segments for table: {}", tableNameWithType);
+
+    TableConfig tableConfig = _pinotHelixResourceManager.getRealtimeTableConfig(tableNameWithType);
+    final RealtimeTagConfig realtimeTagConfig = new RealtimeTagConfig(tableConfig);
+    if (!realtimeTagConfig.isRelocateCompletedSegments()) {
+      LOGGER.info("Skipping relocation of segments for {}", tableNameWithType);
       return;
     }
-    try {
-      LOGGER.info("Starting relocation of segments for table: {}", tableNameWithType);
 
-      TableConfig tableConfig = _pinotHelixResourceManager.getRealtimeTableConfig(tableNameWithType);
-      final RealtimeTagConfig realtimeTagConfig = new RealtimeTagConfig(tableConfig);
-      if (!realtimeTagConfig.isRelocateCompletedSegments()) {
-        LOGGER.info("Skipping relocation of segments for {}", tableNameWithType);
-        return;
-      }
-
-      Function<IdealState, IdealState> updater = new Function<IdealState, IdealState>() {
-        @Nullable
-        @Override
-        public IdealState apply(@Nullable IdealState idealState) {
-          if (!idealState.isEnabled()) {
-            LOGGER.info("Skipping relocation of segments for {} since ideal state is disabled", tableNameWithType);
-            return null;
-          }
-          relocateSegments(realtimeTagConfig, idealState);
-          return idealState;
+    Function<IdealState, IdealState> updater = new Function<IdealState, IdealState>() {
+      @Nullable
+      @Override
+      public IdealState apply(@Nullable IdealState idealState) {
+        if (!idealState.isEnabled()) {
+          LOGGER.info("Skipping relocation of segments for {} since ideal state is disabled", tableNameWithType);
+          return null;
         }
-      };
+        relocateSegments(realtimeTagConfig, idealState);
+        return idealState;
+      }
+    };
 
-      HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(), tableNameWithType, updater,
-          RetryPolicies.exponentialBackoffRetryPolicy(5, 1000, 2.0f));
-    } catch (Exception e) {
-      LOGGER.error("Exception in relocating realtime segments of table {}", tableNameWithType, e);
-    }
+    HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(), tableNameWithType, updater,
+        RetryPolicies.exponentialBackoffRetryPolicy(5, 1000, 2.0f));
   }
 
   /**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 25a2db8..53f9b35 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -30,6 +30,7 @@ import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
 import org.apache.pinot.common.utils.SegmentName;
@@ -53,9 +54,10 @@ public class RetentionManager extends ControllerPeriodicTask {
 
   private final int _deletedSegmentsRetentionInDays;
 
-  public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config) {
+  public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
+      ControllerMetrics controllerMetrics) {
     super("RetentionManager", config.getRetentionControllerFrequencyInSeconds(),
-        config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager);
+        config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
     _deletedSegmentsRetentionInDays = config.getDeletedSegmentsRetentionInDays();
 
     LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}, deletedSegmentsRetentionInDays: {}",
@@ -69,17 +71,12 @@ public class RetentionManager extends ControllerPeriodicTask {
 
   @Override
   protected void preprocess() {
-
   }
 
   @Override
   protected void processTable(String tableNameWithType) {
-    try {
-      LOGGER.info("Start managing retention for table: {}", tableNameWithType);
-      manageRetentionForTable(tableNameWithType);
-    } catch (Exception e) {
-      LOGGER.error("Caught exception while managing retention for table: {}", tableNameWithType, e);
-    }
+    LOGGER.info("Start managing retention for table: {}", tableNameWithType);
+    manageRetentionForTable(tableNameWithType);
   }
 
   @Override
@@ -88,39 +85,41 @@ public class RetentionManager extends ControllerPeriodicTask {
     _pinotHelixResourceManager.getSegmentDeletionManager().removeAgedDeletedSegments(_deletedSegmentsRetentionInDays);
   }
 
+  @Override
+  protected void exceptionHandler(String tableNameWithType, Exception e) {
+    LOGGER.error("Caught exception while managing retention for table: {}", tableNameWithType, e);
+  }
+
   private void manageRetentionForTable(String tableNameWithType) {
-    try {
-      // Build retention strategy from table config
-      TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
-      if (tableConfig == null) {
-        LOGGER.error("Failed to get table config for table: {}", tableNameWithType);
-        return;
-      }
-      SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
-      String segmentPushType = validationConfig.getSegmentPushType();
-      if (!"APPEND".equalsIgnoreCase(segmentPushType)) {
-        LOGGER.info("Segment push type is not APPEND for table: {}, skip", tableNameWithType);
-        return;
-      }
-      String retentionTimeUnit = validationConfig.getRetentionTimeUnit();
-      String retentionTimeValue = validationConfig.getRetentionTimeValue();
-      RetentionStrategy retentionStrategy;
-      try {
-        retentionStrategy = new TimeRetentionStrategy(TimeUnit.valueOf(retentionTimeUnit.toUpperCase()),
-            Long.parseLong(retentionTimeValue));
-      } catch (Exception e) {
-        LOGGER.warn("Invalid retention time: {} {} for table: {}, skip", retentionTimeUnit, retentionTimeValue);
-        return;
-      }
 
-      // Scan all segment ZK metadata and purge segments if necessary
-      if (TableNameBuilder.OFFLINE.tableHasTypeSuffix(tableNameWithType)) {
-        manageRetentionForOfflineTable(tableNameWithType, retentionStrategy);
-      } else {
-        manageRetentionForRealtimeTable(tableNameWithType, retentionStrategy);
-      }
+    // Build retention strategy from table config
+    TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    if (tableConfig == null) {
+      LOGGER.error("Failed to get table config for table: {}", tableNameWithType);
+      return;
+    }
+    SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
+    String segmentPushType = validationConfig.getSegmentPushType();
+    if (!"APPEND".equalsIgnoreCase(segmentPushType)) {
+      LOGGER.info("Segment push type is not APPEND for table: {}, skip", tableNameWithType);
+      return;
+    }
+    String retentionTimeUnit = validationConfig.getRetentionTimeUnit();
+    String retentionTimeValue = validationConfig.getRetentionTimeValue();
+    RetentionStrategy retentionStrategy;
+    try {
+      retentionStrategy = new TimeRetentionStrategy(TimeUnit.valueOf(retentionTimeUnit.toUpperCase()),
+          Long.parseLong(retentionTimeValue));
     } catch (Exception e) {
-      LOGGER.error("Caught exception while managing retention for table: {}", tableNameWithType, e);
+      LOGGER.warn("Invalid retention time: {} {} for table: {}, skip", retentionTimeUnit, retentionTimeValue);
+      return;
+    }
+
+    // Scan all segment ZK metadata and purge segments if necessary
+    if (TableNameBuilder.OFFLINE.tableHasTypeSuffix(tableNameWithType)) {
+      manageRetentionForOfflineTable(tableNameWithType, retentionStrategy);
+    } else {
+      manageRetentionForRealtimeTable(tableNameWithType, retentionStrategy);
     }
   }
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
index c969ab1..c71a4ab 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Set;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
@@ -37,9 +38,10 @@ public class BrokerResourceValidationManager extends ControllerPeriodicTask {
 
   private List<InstanceConfig> _instanceConfigs;
 
-  public BrokerResourceValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager) {
+  public BrokerResourceValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
+      ControllerMetrics controllerMetrics) {
     super("BrokerResourceValidationManager", config.getBrokerResourceValidationFrequencyInSeconds(),
-        config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager);
+        config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
   }
 
   @Override
@@ -49,26 +51,26 @@ public class BrokerResourceValidationManager extends ControllerPeriodicTask {
 
   @Override
   protected void processTable(String tableNameWithType) {
-    try {
-      TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
-      if (tableConfig == null) {
-        LOGGER.warn("Failed to find table config for table: {}, skipping broker resource validation", tableNameWithType);
-        return;
-      }
-
-      // Rebuild broker resource
-      Set<String> brokerInstances = _pinotHelixResourceManager.getAllInstancesForBrokerTenant(_instanceConfigs,
-          tableConfig.getTenantConfig().getBroker());
-      _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType, brokerInstances);
-    } catch (Exception e) {
-      LOGGER.warn("Caught exception while validating broker resource for table: {}", tableNameWithType, e);
+    TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    if (tableConfig == null) {
+      LOGGER.warn("Failed to find table config for table: {}, skipping broker resource validation", tableNameWithType);
+      return;
     }
+
+    // Rebuild broker resource
+    Set<String> brokerInstances = _pinotHelixResourceManager.getAllInstancesForBrokerTenant(_instanceConfigs,
+        tableConfig.getTenantConfig().getBroker());
+    _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType, brokerInstances);
   }
 
 
   @Override
   protected void postprocess() {
+  }
 
+  @Override
+  protected void exceptionHandler(String tableNameWithType, Exception e) {
+    LOGGER.error("Caught exception while validating broker resource for table: {}", tableNameWithType, e);
   }
 
   @Override
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index 14b9827..ca9563b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -26,6 +26,7 @@ import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
+import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.metrics.ValidationMetrics;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.time.TimeUtils;
@@ -48,9 +49,9 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask {
   private final ValidationMetrics _validationMetrics;
 
   public OfflineSegmentIntervalChecker(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
-      ValidationMetrics validationMetrics) {
+      ValidationMetrics validationMetrics, ControllerMetrics controllerMetrics) {
     super("OfflineSegmentIntervalChecker", config.getOfflineSegmentIntervalCheckerFrequencyInSeconds(),
-        config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager);
+        config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
     _validationMetrics = validationMetrics;
   }
 
@@ -60,21 +61,15 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask {
 
   @Override
   protected void processTable(String tableNameWithType) {
-    try {
+    CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    if (tableType == CommonConstants.Helix.TableType.OFFLINE) {
 
-      CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
-      if (tableType == CommonConstants.Helix.TableType.OFFLINE) {
-
-        TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
-        if (tableConfig == null) {
-          LOGGER.warn("Failed to find table config for table: {}, skipping validation", tableNameWithType);
-          return;
-        }
-
-        validateOfflineSegmentPush(tableConfig);
+      TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}, skipping validation", tableNameWithType);
+        return;
       }
-    } catch (Exception e) {
-      LOGGER.warn("Caught exception while checking offline segment intervals for table: {}", tableNameWithType, e);
+      validateOfflineSegmentPush(tableConfig);
     }
   }
 
@@ -212,7 +207,11 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask {
 
   @Override
   protected void postprocess() {
+  }
 
+  @Override
+  protected void exceptionHandler(String tableNameWithType, Exception e) {
+    LOGGER.warn("Caught exception while checking offline segment intervals for table: {}", tableNameWithType, e);
   }
 
   @Override
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 245437f..a43d63b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.metrics.ValidationMetrics;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.HLCSegmentName;
@@ -53,9 +54,10 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask {
   private boolean _updateRealtimeDocumentCount;
 
   public RealtimeSegmentValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
-      PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, ValidationMetrics validationMetrics) {
+      PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, ValidationMetrics validationMetrics,
+      ControllerMetrics controllerMetrics) {
     super("RealtimeSegmentValidationManager", config.getRealtimeSegmentValidationFrequencyInSeconds(),
-        config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager);
+        config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
     _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
     _validationMetrics = validationMetrics;
 
@@ -78,28 +80,24 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask {
 
   @Override
   protected void processTable(String tableNameWithType) {
-    try {
-      CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
-      if (tableType == CommonConstants.Helix.TableType.REALTIME) {
-
-        TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
-        if (tableConfig == null) {
-          LOGGER.warn("Failed to find table config for table: {}, skipping validation", tableNameWithType);
-          return;
-        }
+    CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    if (tableType == CommonConstants.Helix.TableType.REALTIME) {
 
-        if (_updateRealtimeDocumentCount) {
-          updateRealtimeDocumentCount(tableConfig);
-        }
+      TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}, skipping validation", tableNameWithType);
+        return;
+      }
 
-        Map<String, String> streamConfigMap = tableConfig.getIndexingConfig().getStreamConfigs();
-        StreamConfig streamConfig = new StreamConfig(streamConfigMap);
-        if (streamConfig.hasLowLevelConsumerType()) {
-          _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig);
-        }
+      if (_updateRealtimeDocumentCount) {
+        updateRealtimeDocumentCount(tableConfig);
+      }
+
+      Map<String, String> streamConfigMap = tableConfig.getIndexingConfig().getStreamConfigs();
+      StreamConfig streamConfig = new StreamConfig(streamConfigMap);
+      if (streamConfig.hasLowLevelConsumerType()) {
+        _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig);
       }
-    } catch (Exception e) {
-      LOGGER.warn("Caught exception while validating realtime table: {}", tableNameWithType, e);
     }
   }
 
@@ -151,7 +149,11 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask {
 
   @Override
   protected void postprocess() {
+  }
 
+  @Override
+  protected void exceptionHandler(String tableNameWithType, Exception e) {
+    LOGGER.error("Caught exception while validating realtime table: {}", tableNameWithType, e);
   }
 
   @Override
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index 29a96de..e20cdc8 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -18,11 +18,14 @@
  */
 package org.apache.pinot.controller.helix.core.periodictask;
 
+import com.yammer.metrics.core.MetricsRegistry;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.testng.annotations.BeforeTest;
@@ -40,14 +43,16 @@ public class ControllerPeriodicTaskTest {
   private final ControllerConf _controllerConf = new ControllerConf();
 
   private final PinotHelixResourceManager _resourceManager = mock(PinotHelixResourceManager.class);
+  private final ControllerMetrics _controllerMetrics = new ControllerMetrics(new MetricsRegistry());
   private final AtomicBoolean _stopTaskCalled = new AtomicBoolean();
   private final AtomicBoolean _initTaskCalled = new AtomicBoolean();
   private final AtomicBoolean _processCalled = new AtomicBoolean();
-  private final AtomicInteger _numTablesProcessed = new AtomicInteger();
+  private final AtomicInteger _tablesProcessed = new AtomicInteger();
   private final int _numTables = 3;
+  private static final String TASK_NAME = "TestTask";
 
-  private final MockControllerPeriodicTask _task = new MockControllerPeriodicTask("TestTask", RUN_FREQUENCY_IN_SECONDS,
-      _controllerConf.getPeriodicTaskInitialDelayInSeconds(), _resourceManager) {
+  private final MockControllerPeriodicTask _task = new MockControllerPeriodicTask(TASK_NAME, RUN_FREQUENCY_IN_SECONDS,
+      _controllerConf.getPeriodicTaskInitialDelayInSeconds(), _resourceManager, _controllerMetrics) {
 
     @Override
     protected void initTask() {
@@ -67,7 +72,7 @@ public class ControllerPeriodicTaskTest {
 
     @Override
     public void processTable(String tableNameWithType) {
-      _numTablesProcessed.getAndIncrement();
+      _tablesProcessed.getAndIncrement();
     }
   };
 
@@ -82,7 +87,8 @@ public class ControllerPeriodicTaskTest {
     _initTaskCalled.set(false);
     _stopTaskCalled.set(false);
     _processCalled.set(false);
-    _numTablesProcessed.set(0);
+    _tablesProcessed.set(0);
+    _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, TASK_NAME,0);
   }
 
   @Test
@@ -102,16 +108,20 @@ public class ControllerPeriodicTaskTest {
     _task.init();
     assertTrue(_initTaskCalled.get());
     assertFalse(_processCalled.get());
-    assertEquals(_numTablesProcessed.get(), 0);
+    assertEquals(_tablesProcessed.get(), 0);
     assertFalse(_stopTaskCalled.get());
     assertFalse(_task.shouldStopPeriodicTask());
+    assertEquals(_controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, TASK_NAME),
+        0);
 
     // run task - leadership gained
     resetState();
     _task.run();
     assertFalse(_initTaskCalled.get());
     assertTrue(_processCalled.get());
-    assertEquals(_numTablesProcessed.get(), _numTables);
+    assertEquals(_tablesProcessed.get(), _numTables);
+    assertEquals(_controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, TASK_NAME),
+        _numTables);
     assertFalse(_stopTaskCalled.get());
     assertFalse(_task.shouldStopPeriodicTask());
 
@@ -120,7 +130,9 @@ public class ControllerPeriodicTaskTest {
     _task.stop();
     assertFalse(_initTaskCalled.get());
     assertFalse(_processCalled.get());
-    assertEquals(_numTablesProcessed.get(), 0);
+    assertEquals(_tablesProcessed.get(), 0);
+    assertEquals(_controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, TASK_NAME),
+        0);
     assertTrue(_stopTaskCalled.get());
     assertTrue(_task.shouldStopPeriodicTask());
 
@@ -130,7 +142,9 @@ public class ControllerPeriodicTaskTest {
     assertFalse(_task.shouldStopPeriodicTask());
     assertFalse(_initTaskCalled.get());
     assertTrue(_processCalled.get());
-    assertEquals(_numTablesProcessed.get(), _numTables);
+    assertEquals(_tablesProcessed.get(), _numTables);
+    assertEquals(_controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, TASK_NAME),
+        _numTables);
     assertFalse(_stopTaskCalled.get());
 
   }
@@ -138,8 +152,8 @@ public class ControllerPeriodicTaskTest {
   private class MockControllerPeriodicTask extends ControllerPeriodicTask {
 
     public MockControllerPeriodicTask(String taskName, long runFrequencyInSeconds, long initialDelayInSeconds,
-        PinotHelixResourceManager pinotHelixResourceManager) {
-      super(taskName, runFrequencyInSeconds, initialDelayInSeconds, pinotHelixResourceManager);
+        PinotHelixResourceManager pinotHelixResourceManager, ControllerMetrics controllerMetrics) {
+      super(taskName, runFrequencyInSeconds, initialDelayInSeconds, pinotHelixResourceManager, controllerMetrics);
     }
 
     @Override
@@ -149,7 +163,6 @@ public class ControllerPeriodicTaskTest {
 
     @Override
     protected void preprocess() {
-
     }
 
     @Override
@@ -159,9 +172,12 @@ public class ControllerPeriodicTaskTest {
 
     @Override
     public void postprocess() {
-
     }
 
+    @Override
+    public void exceptionHandler(String tableNameWithType, Exception e) {
+
+    }
 
     @Override
     public void stopTask() {
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
index 7271ca8..4dedc43 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.controller.helix.core.relocation;
 
 import com.google.common.collect.Lists;
+import com.yammer.metrics.core.MetricsRegistry;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -31,6 +32,7 @@ import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.builder.CustomModeISBuilder;
 import org.apache.pinot.common.config.RealtimeTagConfig;
+import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
@@ -68,7 +70,9 @@ public class RealtimeSegmentRelocatorTest {
     _mockHelixManager = mock(HelixManager.class);
     when(mockPinotHelixResourceManager.getHelixZkManager()).thenReturn(_mockHelixManager);
     ControllerConf controllerConfig = new ControllerConf();
-    _realtimeSegmentRelocator = new TestRealtimeSegmentRelocator(mockPinotHelixResourceManager, controllerConfig);
+    ControllerMetrics controllerMetrics = new ControllerMetrics(new MetricsRegistry());
+    _realtimeSegmentRelocator =
+        new TestRealtimeSegmentRelocator(mockPinotHelixResourceManager, controllerConfig, controllerMetrics);
 
     final int maxInstances = 20;
     serverNames = new String[maxInstances];
@@ -261,8 +265,9 @@ public class RealtimeSegmentRelocatorTest {
 
     private Map<String, List<String>> tagToInstances;
 
-    public TestRealtimeSegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config) {
-      super(pinotHelixResourceManager, config);
+    public TestRealtimeSegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
+        ControllerMetrics controllerMetrics) {
+      super(pinotHelixResourceManager, config, controllerMetrics);
       tagToInstances = new HashedMap();
     }
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index a171505..e569439 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.controller.helix.core.retention;
 
+import com.yammer.metrics.core.MetricsRegistry;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -29,6 +30,7 @@ import org.apache.pinot.common.config.TableNameBuilder;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.segment.SegmentMetadata;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
@@ -86,9 +88,10 @@ public class RetentionManagerTest {
     when(pinotHelixResourceManager.getOfflineSegmentMetadata(OFFLINE_TABLE_NAME)).thenReturn(metadataList);
 
     ControllerConf conf = new ControllerConf();
+    ControllerMetrics controllerMetrics = new ControllerMetrics(new MetricsRegistry());
     conf.setRetentionControllerFrequencyInSeconds(0);
     conf.setDeletedSegmentsRetentionInDays(0);
-    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, conf);
+    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, conf, controllerMetrics);
     retentionManager.init();
     retentionManager.run();
 
@@ -206,9 +209,10 @@ public class RetentionManagerTest {
     setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager);
 
     ControllerConf conf = new ControllerConf();
+    ControllerMetrics controllerMetrics = new ControllerMetrics(new MetricsRegistry());
     conf.setRetentionControllerFrequencyInSeconds(0);
     conf.setDeletedSegmentsRetentionInDays(0);
-    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, conf);
+    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, conf, controllerMetrics);
     retentionManager.init();
     retentionManager.run();
 
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/controller/periodic/tasks/SegmentStatusCheckerIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/controller/periodic/tasks/SegmentStatusCheckerIntegrationTest.java
index deefc4a..9966840 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/controller/periodic/tasks/SegmentStatusCheckerIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/controller/periodic/tasks/SegmentStatusCheckerIntegrationTest.java
@@ -40,6 +40,8 @@ import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet;
 import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
 import org.apache.pinot.util.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -52,13 +54,18 @@ import org.testng.annotations.Test;
  */
 public class SegmentStatusCheckerIntegrationTest extends BaseClusterIntegrationTestSet {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentStatusCheckerIntegrationTest.class);
+
   private String emptyTable = "table1_OFFLINE";
   private String disabledOfflineTable = "table2_OFFLINE";
   private String basicOfflineTable = "table3_OFFLINE";
   private String errorOfflineTable = "table4_OFFLINE";
   private String realtimeTableErrorState = "table5_REALTIME";
+  private String _currentTableName;
+  private static final int NUM_TABLES = 5;
 
   private static final int SEGMENT_STATUS_CHECKER_INITIAL_DELAY_SECONDS = 60;
+  private static final int SEGMENT_STATUS_CHECKER_FREQ_SECONDS = 5;
 
   @BeforeClass
   public void setUp() throws Exception {
@@ -67,14 +74,13 @@ public class SegmentStatusCheckerIntegrationTest extends BaseClusterIntegrationT
     startZk();
 
     // Set initial delay of 60 seconds for the segment status checker, to allow time for tables setup.
-    // By default, it will pick a random delay between 120s and 300s
+    // Run at 5 seconds freq in order to keep it running, in case first run happens before table setup
     ControllerConf controllerConf = getDefaultControllerConfiguration();
     controllerConf.setStatusCheckerInitialDelayInSeconds(SEGMENT_STATUS_CHECKER_INITIAL_DELAY_SECONDS);
+    controllerConf.setStatusCheckerFrequencyInSeconds(SEGMENT_STATUS_CHECKER_FREQ_SECONDS);
 
     startController(controllerConf);
-
     startBroker();
-
     startServers(3);
 
     // empty table
@@ -107,9 +113,6 @@ public class SegmentStatusCheckerIntegrationTest extends BaseClusterIntegrationT
 
     // realtime table with segments in error state
     setupRealtimeTable(realtimeTableErrorState);
-
-    // we need to wait for SegmentStatusChecker to finish at least 1 run
-    Thread.sleep(TimeUnit.MILLISECONDS.convert(SEGMENT_STATUS_CHECKER_INITIAL_DELAY_SECONDS + 10, TimeUnit.SECONDS));
   }
 
   private void setupOfflineTable(String table) throws Exception {
@@ -119,6 +122,8 @@ public class SegmentStatusCheckerIntegrationTest extends BaseClusterIntegrationT
   }
 
   private void setupOfflineTableAndSegments(String table) throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+    setTableName(table);
     _realtimeTableConfig = null;
     addOfflineTable(table);
     completeTableConfiguration();
@@ -129,6 +134,8 @@ public class SegmentStatusCheckerIntegrationTest extends BaseClusterIntegrationT
     executor.shutdown();
     executor.awaitTermination(10, TimeUnit.MINUTES);
     uploadSegments(_tarDir);
+
+    waitForAllDocsLoaded(600_000L);
   }
 
   private void setupRealtimeTable(String table) throws Exception {
@@ -151,6 +158,14 @@ public class SegmentStatusCheckerIntegrationTest extends BaseClusterIntegrationT
     completeTableConfiguration();
   }
 
+  @Override
+  public String getTableName() {
+    return _currentTableName;
+  }
+
+  private void setTableName(String tableName) {
+    _currentTableName = tableName;
+  }
   /**
    * After 1 run of SegmentStatusChecker the controllerMetrics will be set for each table
    * Validate that we are seeing the expected numbers
@@ -159,6 +174,20 @@ public class SegmentStatusCheckerIntegrationTest extends BaseClusterIntegrationT
   public void testSegmentStatusChecker() {
     ControllerMetrics controllerMetrics = _controllerStarter.getControllerMetrics();
 
+    long millisToWait = TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
+    while (controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
+        "SegmentStatusChecker") < NUM_TABLES && millisToWait > 0) {
+      try {
+        Thread.sleep(1000);
+        millisToWait -= 1000;
+      } catch (InterruptedException e) {
+        LOGGER.info("Interrupted while waiting for SegmentStatusChecker");
+      }
+    }
+
+    Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
+        "SegmentStatusChecker"), NUM_TABLES);
+
     // empty table - table1_OFFLINE
     // num replicas set from ideal state
     Assert.assertEquals(controllerMetrics.getValueOfTableGauge(emptyTable, ControllerGauge.NUMBER_OF_REPLICAS), 3);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org