You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2019/01/04 23:51:15 UTC

[incubator-pinot] 01/01: Split ValidationManager duties into separate ControllerPeriodicTasks

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch split_validation_manager_periodic_tasks
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit a786bb90e5fbf1617a338c36b416733a15beb1a5
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Fri Jan 4 15:50:52 2019 -0800

    Split ValidationManager duties into separate ControllerPeriodicTasks
---
 .../linkedin/pinot/controller/ControllerConf.java  | 135 +++++++++++------
 .../pinot/controller/ControllerStarter.java        |  41 ++++--
 .../realtime/PinotLLCRealtimeSegmentManager.java   |   4 +-
 .../helix/core/retention/RetentionManager.java     |  12 +-
 .../BrokerResourceValidationManager.java           |  80 ++++++++++
 ...ger.java => OfflineSegmentIntervalChecker.java} | 116 ++-------------
 .../RealtimeSegmentValidationManager.java          | 162 +++++++++++++++++++++
 .../helix/core/retention/RetentionManagerTest.java |  11 +-
 .../validation/ValidationManagerTest.java          |  14 +-
 .../tests/SegmentCompletionIntegrationTests.java   |   4 +-
 .../admin/command/StartControllerCommand.java      |   4 +-
 11 files changed, 406 insertions(+), 177 deletions(-)

diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
index b391765..f2b7f9f 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
@@ -51,15 +51,40 @@ public class ControllerConf extends PropertiesConfiguration {
   private static final String CONSOLE_WEBAPP_ROOT_PATH = "controller.query.console";
   private static final String CONSOLE_WEBAPP_USE_HTTPS = "controller.query.console.useHttps";
   private static final String EXTERNAL_VIEW_ONLINE_TO_OFFLINE_TIMEOUT = "controller.upload.onlineToOfflineTimeout";
-  private static final String RETENTION_MANAGER_FREQUENCY_IN_SECONDS = "controller.retention.frequencyInSeconds";
-  private static final String VALIDATION_MANAGER_FREQUENCY_IN_SECONDS = "controller.validation.frequencyInSeconds";
-  private static final String STATUS_CHECKER_FREQUENCY_IN_SECONDS = "controller.statuschecker.frequencyInSeconds";
-  private static final String REALTIME_SEGMENT_RELOCATOR_FREQUENCY = "controller.realtime.segment.relocator.frequency";
-  private static final String STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS = "controller.statuschecker.waitForPushTimeInSeconds";
+
+  public static class ControllerPeriodicTasksConf {
+    private static final String RETENTION_MANAGER_FREQUENCY_IN_SECONDS = "controller.retention.frequencyInSeconds";
+    private static final String OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS =
+        "controller.offline.segment.interval.checker.frequencyInSeconds";
+    private static final String REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS =
+        "controller.realtime.segment.validation.frequencyInSeconds";
+    private static final String BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS =
+        "controller.broker.resource.validation.frequencyInSeconds";
+    private static final String STATUS_CHECKER_FREQUENCY_IN_SECONDS = "controller.statuschecker.frequencyInSeconds";
+    private static final String STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS =
+        "controller.statuschecker.waitForPushTimeInSeconds";
+    private static final String TASK_MANAGER_FREQUENCY_IN_SECONDS = "controller.task.frequencyInSeconds";
+    private static final String REALTIME_SEGMENT_RELOCATOR_FREQUENCY =
+        "controller.realtime.segment.relocator.frequency";
+    // Because segment level validation is expensive and requires heavy ZK access, we run segment level validation with a
+    // separate interval
+    private static final String SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS =
+        "controller.segment.level.validation.intervalInSeconds";
+
+    private static final int DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours.
+    private static final int DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours.
+    private static final int DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
+    private static final int DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
+    private static final int DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS = 5 * 60; // 5 minutes
+    private static final int DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS = 10 * 60; // 10 minutes
+    private static final int DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS = -1; // Disabled
+    private static final String DEFAULT_REALTIME_SEGMENT_RELOCATOR_FREQUENCY = "1h"; // 1 hour
+    private static final int DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS = 24 * 60 * 60;
+  }
+
   private static final String SERVER_ADMIN_REQUEST_TIMEOUT_SECONDS = "server.request.timeoutSeconds";
   private static final String SEGMENT_COMMIT_TIMEOUT_SECONDS = "controller.realtime.segment.commit.timeoutSeconds";
   private static final String DELETED_SEGMENTS_RETENTION_IN_DAYS = "controller.deleted.segments.retentionInDays";
-  private static final String TASK_MANAGER_FREQUENCY_IN_SECONDS = "controller.task.frequencyInSeconds";
   private static final String TABLE_MIN_REPLICAS = "table.minReplicas";
   private static final String ENABLE_SPLIT_COMMIT = "controller.enable.split.commit";
   private static final String JERSEY_ADMIN_API_PORT = "jersey.admin.api.port";
@@ -70,25 +95,17 @@ public class ControllerConf extends PropertiesConfiguration {
   private static final String SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS = "controller.segment.upload.timeoutInMillis";
   private static final String REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = "controller.realtime.segment.metadata.commit.numLocks";
   private static final String ENABLE_STORAGE_QUOTA_CHECK = "controller.enable.storage.quota.check";
-  // Because segment level validation is expensive and requires heavy ZK access, we run segment level validation with a
-  // separate interval
-  private static final String SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS =
-      "controller.segment.level.validation.intervalInSeconds";
+
   private static final String ENABLE_BATCH_MESSAGE_MODE = "controller.enable.batch.message.mode";
 
   // Defines the kind of storage and the underlying PinotFS implementation
   private static final String PINOT_FS_FACTORY_CLASS_PREFIX = "controller.storage.factory.class";
   private static final String PINOT_FS_FACTORY_CLASS_LOCAL = "controller.storage.factory.class.file";
 
-  private static final int DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours.
-  private static final int DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
-  private static final int DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS = 5 * 60; // 5 minutes
-  private static final String DEFAULT_REALTIME_SEGMENT_RELOCATOR_FREQUENCY = "1h"; // 1 hour
-  private static final int DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS = 10 * 60; // 10 minutes
+
   private static final long DEFAULT_EXTERNAL_VIEW_ONLINE_TO_OFFLINE_TIMEOUT_MILLIS = 120_000L; // 2 minutes
   private static final int DEFAULT_SERVER_ADMIN_REQUEST_TIMEOUT_SECONDS = 30;
   private static final int DEFAULT_DELETED_SEGMENTS_RETENTION_IN_DAYS = 7;
-  private static final int DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS = -1; // Disabled
   private static final int DEFAULT_TABLE_MIN_REPLICAS = 1;
   private static final boolean DEFAULT_ENABLE_SPLIT_COMMIT = false;
   private static final int DEFAULT_JERSEY_ADMIN_PORT = 21000;
@@ -98,7 +115,6 @@ public class ControllerConf extends PropertiesConfiguration {
   private static final int DEFAULT_REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = 64;
   private static final boolean DEFAULT_ENABLE_STORAGE_QUOTA_CHECK = true;
   private static final boolean DEFAULT_ENABLE_BATCH_MESSAGE_MODE = true;
-  private static final int DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS = 24 * 60 * 60;
 
   private static final String DEFAULT_PINOT_FS_FACTORY_CLASS_LOCAL = LocalPinotFS.class.getName();
 
@@ -321,58 +337,91 @@ public class ControllerConf extends PropertiesConfiguration {
   }
 
   public int getRetentionControllerFrequencyInSeconds() {
-    if (containsKey(RETENTION_MANAGER_FREQUENCY_IN_SECONDS)) {
-      return Integer.parseInt((String) getProperty(RETENTION_MANAGER_FREQUENCY_IN_SECONDS));
+    if (containsKey(ControllerPeriodicTasksConf.RETENTION_MANAGER_FREQUENCY_IN_SECONDS)) {
+      return Integer.parseInt((String) getProperty(ControllerPeriodicTasksConf.RETENTION_MANAGER_FREQUENCY_IN_SECONDS));
     }
-    return DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS;
+    return ControllerPeriodicTasksConf.DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS;
   }
 
   public void setRetentionControllerFrequencyInSeconds(int retentionFrequencyInSeconds) {
-    setProperty(RETENTION_MANAGER_FREQUENCY_IN_SECONDS, Integer.toString(retentionFrequencyInSeconds));
+    setProperty(ControllerPeriodicTasksConf.RETENTION_MANAGER_FREQUENCY_IN_SECONDS,
+        Integer.toString(retentionFrequencyInSeconds));
+  }
+
+  public int getOfflineSegmentIntervalCheckerFrequencyInSeconds() {
+    if (containsKey(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS)) {
+      return Integer.parseInt(
+          (String) getProperty(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS));
+    }
+    return ControllerPeriodicTasksConf.DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS;
+  }
+
+  public void setOfflineSegmentIntervalCheckerFrequencyInSeconds(int validationFrequencyInSeconds) {
+    setProperty(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS,
+        Integer.toString(validationFrequencyInSeconds));
+  }
+
+  public int getRealtimeSegmentValidationFrequencyInSeconds() {
+
+    if (containsKey(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS)) {
+      return Integer.parseInt(
+          (String) getProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS));
+    }
+    return ControllerPeriodicTasksConf.DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS;
+  }
+
+  public void setRealtimeSegmentValidationFrequencyInSeconds(int validationFrequencyInSeconds) {
+    setProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS,
+        Integer.toString(validationFrequencyInSeconds));
   }
 
-  public int getValidationControllerFrequencyInSeconds() {
-    if (containsKey(VALIDATION_MANAGER_FREQUENCY_IN_SECONDS)) {
-      return Integer.parseInt((String) getProperty(VALIDATION_MANAGER_FREQUENCY_IN_SECONDS));
+  public int getBrokerResourceValidationFrequencyInSeconds() {
+    if (containsKey(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS)) {
+      return Integer.parseInt(
+          (String) getProperty(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS));
     }
-    return DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS;
+    return ControllerPeriodicTasksConf.DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS;
   }
 
-  public void setValidationControllerFrequencyInSeconds(int validationFrequencyInSeconds) {
-    setProperty(VALIDATION_MANAGER_FREQUENCY_IN_SECONDS, Integer.toString(validationFrequencyInSeconds));
+  public void setBrokerResourceValidationFrequencyInSeconds(int validationFrequencyInSeconds) {
+    setProperty(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS,
+        Integer.toString(validationFrequencyInSeconds));
   }
 
   public int getStatusCheckerFrequencyInSeconds() {
-    if (containsKey(STATUS_CHECKER_FREQUENCY_IN_SECONDS)) {
-      return Integer.parseInt((String) getProperty(STATUS_CHECKER_FREQUENCY_IN_SECONDS));
+    if (containsKey(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_IN_SECONDS)) {
+      return Integer.parseInt((String) getProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_IN_SECONDS));
     }
-    return DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS;
+    return ControllerPeriodicTasksConf.DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS;
   }
 
   public void setStatusCheckerFrequencyInSeconds(int statusCheckerFrequencyInSeconds) {
-    setProperty(STATUS_CHECKER_FREQUENCY_IN_SECONDS, Integer.toString(statusCheckerFrequencyInSeconds));
+    setProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_IN_SECONDS,
+        Integer.toString(statusCheckerFrequencyInSeconds));
   }
 
   public String getRealtimeSegmentRelocatorFrequency() {
-    if (containsKey(REALTIME_SEGMENT_RELOCATOR_FREQUENCY)) {
-      return (String) getProperty(REALTIME_SEGMENT_RELOCATOR_FREQUENCY);
+    if (containsKey(ControllerPeriodicTasksConf.REALTIME_SEGMENT_RELOCATOR_FREQUENCY)) {
+      return (String) getProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_RELOCATOR_FREQUENCY);
     }
-    return DEFAULT_REALTIME_SEGMENT_RELOCATOR_FREQUENCY;
+    return ControllerPeriodicTasksConf.DEFAULT_REALTIME_SEGMENT_RELOCATOR_FREQUENCY;
   }
 
   public void setRealtimeSegmentRelocatorFrequency(String relocatorFrequency) {
-    setProperty(REALTIME_SEGMENT_RELOCATOR_FREQUENCY, relocatorFrequency);
+    setProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_RELOCATOR_FREQUENCY, relocatorFrequency);
   }
 
   public int getStatusCheckerWaitForPushTimeInSeconds() {
-    if (containsKey(STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS)) {
-      return Integer.parseInt((String) getProperty(STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS));
+    if (containsKey(ControllerPeriodicTasksConf.STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS)) {
+      return Integer.parseInt(
+          (String) getProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS));
     }
-    return DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS;
+    return ControllerPeriodicTasksConf.DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS;
   }
 
   public void setStatusCheckerWaitForPushTimeInSeconds(int statusCheckerWaitForPushTimeInSeconds) {
-    setProperty(STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS, Integer.toString(statusCheckerWaitForPushTimeInSeconds));
+    setProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS,
+        Integer.toString(statusCheckerWaitForPushTimeInSeconds));
   }
 
   public long getExternalViewOnlineToOfflineTimeout() {
@@ -414,11 +463,12 @@ public class ControllerConf extends PropertiesConfiguration {
   }
 
   public int getTaskManagerFrequencyInSeconds() {
-    return getInt(TASK_MANAGER_FREQUENCY_IN_SECONDS, DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS);
+    return getInt(ControllerPeriodicTasksConf.TASK_MANAGER_FREQUENCY_IN_SECONDS,
+        ControllerPeriodicTasksConf.DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS);
   }
 
   public void setTaskManagerFrequencyInSeconds(int frequencyInSeconds) {
-    setProperty(TASK_MANAGER_FREQUENCY_IN_SECONDS, Integer.toString(frequencyInSeconds));
+    setProperty(ControllerPeriodicTasksConf.TASK_MANAGER_FREQUENCY_IN_SECONDS, Integer.toString(frequencyInSeconds));
   }
 
   public int getDefaultTableMinReplicas() {
@@ -466,6 +516,7 @@ public class ControllerConf extends PropertiesConfiguration {
   }
 
   public int getSegmentLevelValidationIntervalInSeconds() {
-    return getInt(SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS, DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS);
+    return getInt(ControllerPeriodicTasksConf.SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS,
+        ControllerPeriodicTasksConf.DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS);
   }
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
index 1e8552a..1f64920 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
@@ -38,7 +38,9 @@ import com.linkedin.pinot.controller.helix.core.realtime.PinotRealtimeSegmentMan
 import com.linkedin.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategyFactory;
 import com.linkedin.pinot.controller.helix.core.relocation.RealtimeSegmentRelocator;
 import com.linkedin.pinot.controller.helix.core.retention.RetentionManager;
-import com.linkedin.pinot.controller.validation.ValidationManager;
+import com.linkedin.pinot.controller.validation.BrokerResourceValidationManager;
+import com.linkedin.pinot.controller.validation.OfflineSegmentIntervalChecker;
+import com.linkedin.pinot.controller.validation.RealtimeSegmentValidationManager;
 import com.linkedin.pinot.core.crypt.PinotCrypterFactory;
 import com.linkedin.pinot.core.periodictask.PeriodicTask;
 import com.linkedin.pinot.filesystem.PinotFSFactory;
@@ -83,7 +85,9 @@ public class ControllerStarter {
   private final ControllerPeriodicTaskScheduler _controllerPeriodicTaskScheduler;
 
   // Can only be constructed after resource manager getting started
-  private ValidationManager _validationManager;
+  private OfflineSegmentIntervalChecker _offlineSegmentIntervalChecker;
+  private RealtimeSegmentValidationManager _realtimeSegmentValidationManager;
+  private BrokerResourceValidationManager _brokerResourceValidationManager;
   private RealtimeSegmentRelocator _realtimeSegmentRelocator;
   private PinotHelixTaskResourceManager _helixTaskResourceManager;
   private PinotTaskManager _taskManager;
@@ -92,8 +96,7 @@ public class ControllerStarter {
     _config = conf;
     _adminApp = new ControllerAdminApiApplication(_config.getQueryConsoleWebappPath(), _config.getQueryConsoleUseHttps());
     _helixResourceManager = new PinotHelixResourceManager(_config);
-    _retentionManager = new RetentionManager(_helixResourceManager, _config.getRetentionControllerFrequencyInSeconds(),
-        _config.getDeletedSegmentsRetentionInDays());
+    _retentionManager = new RetentionManager(_helixResourceManager, _config);
     _metricsRegistry = new MetricsRegistry();
     _controllerMetrics = new ControllerMetrics(_metricsRegistry);
     _realtimeSegmentsManager = new PinotRealtimeSegmentManager(_helixResourceManager);
@@ -108,8 +111,16 @@ public class ControllerStarter {
     return _helixResourceManager;
   }
 
-  public ValidationManager getValidationManager() {
-    return _validationManager;
+  public OfflineSegmentIntervalChecker getOfflineSegmentIntervalChecker() {
+    return _offlineSegmentIntervalChecker;
+  }
+
+  public RealtimeSegmentValidationManager getRealtimeSegmentValidationManager() {
+    return _realtimeSegmentValidationManager;
+  }
+
+  public BrokerResourceValidationManager getBrokerResourceValidationManager() {
+    return _brokerResourceValidationManager;
   }
 
   public PinotHelixTaskResourceManager getHelixTaskResourceManager() {
@@ -178,10 +189,18 @@ public class ControllerStarter {
     _taskManager = new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, _config, _controllerMetrics);
     periodicTasks.add(_taskManager);
     periodicTasks.add(_retentionManager);
-    _validationManager =
-        new ValidationManager(_config, _helixResourceManager, PinotLLCRealtimeSegmentManager.getInstance(),
+    _offlineSegmentIntervalChecker =
+        new OfflineSegmentIntervalChecker(_config, _helixResourceManager, PinotLLCRealtimeSegmentManager.getInstance(),
+            new ValidationMetrics(_metricsRegistry));
+    _realtimeSegmentValidationManager =
+        new RealtimeSegmentValidationManager(_config, _helixResourceManager, PinotLLCRealtimeSegmentManager.getInstance(),
             new ValidationMetrics(_metricsRegistry));
-    periodicTasks.add(_validationManager);
+    _brokerResourceValidationManager =
+        new BrokerResourceValidationManager(_config, _helixResourceManager);
+
+    periodicTasks.add(_offlineSegmentIntervalChecker);
+    periodicTasks.add(_realtimeSegmentValidationManager);
+    periodicTasks.add(_brokerResourceValidationManager);
     periodicTasks.add(_segmentStatusChecker);
     periodicTasks.add(_realtimeSegmentRelocator);
 
@@ -348,7 +367,9 @@ public class ControllerStarter {
     conf.setControllerVipHost("localhost");
     conf.setControllerVipProtocol("http");
     conf.setRetentionControllerFrequencyInSeconds(3600 * 6);
-    conf.setValidationControllerFrequencyInSeconds(3600);
+    conf.setOfflineSegmentIntervalCheckerFrequencyInSeconds(3600);
+    conf.setRealtimeSegmentValidationFrequencyInSeconds(3600);
+    conf.setBrokerResourceValidationFrequencyInSeconds(3600);
     conf.setStatusCheckerFrequencyInSeconds(5 * 60);
     conf.setRealtimeSegmentRelocatorFrequency("1h");
     conf.setStatusCheckerWaitForPushTimeInSeconds(10 * 60);
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 486231a..7b5093e 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -815,8 +815,8 @@ public class PinotLLCRealtimeSegmentManager {
   /**
    * An instance is reporting that it has stopped consuming a topic due to some error.
    * Mark the state of the segment to be OFFLINE in idealstate.
-   * When all replicas of this segment are marked offline, the ValidationManager, in its next
-   * run, will auto-create a new segment with the appropriate offset.
+   * When all replicas of this segment are marked offline, the {@link com.linkedin.pinot.controller.validation.RealtimeSegmentValidationManager},
+   * in its next run, will auto-create a new segment with the appropriate offset.
    */
   public void segmentStoppedConsuming(final LLCSegmentName segmentName, final String instance) {
     String rawTableName = segmentName.getTableName();
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
index 134c4ee..36367f9 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
@@ -23,6 +23,7 @@ import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import com.linkedin.pinot.common.utils.CommonConstants;
 import com.linkedin.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
 import com.linkedin.pinot.common.utils.SegmentName;
+import com.linkedin.pinot.controller.ControllerConf;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import com.linkedin.pinot.controller.helix.core.retention.strategy.RetentionStrategy;
@@ -49,10 +50,9 @@ public class RetentionManager extends ControllerPeriodicTask {
 
   private final int _deletedSegmentsRetentionInDays;
 
-  public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager, int runFrequencyInSeconds,
-      int deletedSegmentsRetentionInDays) {
-    super("RetentionManager", runFrequencyInSeconds, pinotHelixResourceManager);
-    _deletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays;
+  public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config) {
+    super("RetentionManager", config.getRetentionControllerFrequencyInSeconds(), pinotHelixResourceManager);
+    _deletedSegmentsRetentionInDays = config.getDeletedSegmentsRetentionInDays();
 
     LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}, deletedSegmentsRetentionInDays: {}",
         getIntervalInSeconds(), _deletedSegmentsRetentionInDays);
@@ -145,7 +145,7 @@ public class RetentionManager extends ControllerPeriodicTask {
         // In progress segment, only check LLC segment
         if (SegmentName.isLowLevelConsumerSegmentName(segmentName)) {
           // Delete old LLC segment that hangs around. Do not delete segment that are current since there may be a race
-          // with ValidationManager trying to auto-create the LLC segment
+          // with RealtimeSegmentValidationManager trying to auto-create the LLC segment
           if (shouldDeleteInProgressLLCSegment(segmentName, idealState, realtimeSegmentZKMetadata)) {
             segmentsToDelete.add(segmentName);
           }
@@ -169,7 +169,7 @@ public class RetentionManager extends ControllerPeriodicTask {
       return false;
     }
     // delete a segment only if it is old enough (5 days) or else,
-    // 1. latest segment could get deleted in the middle of repair by ValidationManager
+    // 1. latest segment could get deleted in the middle of repair by RealtimeSegmentValidationManager
     // 2. for a brand new segment, if this code kicks in after new metadata is created but ideal state entry is not yet created (between step 2 and 3),
     // the latest segment metadata could get marked for deletion
     if (System.currentTimeMillis() - realtimeSegmentZKMetadata.getCreationTime()
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/BrokerResourceValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/BrokerResourceValidationManager.java
new file mode 100644
index 0000000..e3264c4
--- /dev/null
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/BrokerResourceValidationManager.java
@@ -0,0 +1,80 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.controller.validation;
+
+import com.linkedin.pinot.common.config.TableConfig;
+import com.linkedin.pinot.controller.ControllerConf;
+import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
+import com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import java.util.List;
+import java.util.Set;
+import org.apache.helix.model.InstanceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Rebuilds the broker resource if the instance set has changed
+ */
+public class BrokerResourceValidationManager extends ControllerPeriodicTask {
+  private static final Logger LOGGER = LoggerFactory.getLogger(BrokerResourceValidationManager.class);
+
+  private List<InstanceConfig> _instanceConfigs;
+
+  public BrokerResourceValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager) {
+    super("BrokerResourceValidationManager", config.getBrokerResourceValidationFrequencyInSeconds(),
+        pinotHelixResourceManager);
+  }
+
+  @Override
+  protected void preprocess() {
+    // Cache instance configs to reduce ZK access
+    _instanceConfigs = _pinotHelixResourceManager.getAllHelixInstanceConfigs();
+  }
+
+  @Override
+  protected void processTable(String tableNameWithType) {
+    try {
+      TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}, skipping broker resource validation", tableNameWithType);
+        return;
+      }
+
+      // Rebuild broker resource
+      Set<String> brokerInstances = _pinotHelixResourceManager.getAllInstancesForBrokerTenant(_instanceConfigs,
+          tableConfig.getTenantConfig().getBroker());
+      _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType, brokerInstances);
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while validating broker resource for table: {}", tableNameWithType, e);
+    }
+  }
+
+
+  @Override
+  protected void postprocess() {
+
+  }
+
+  @Override
+  protected void initTask() {
+
+  }
+
+  @Override
+  public void stopTask() {
+  }
+}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java
similarity index 64%
rename from pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
rename to pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index 7d06289..f35b287 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -16,29 +16,20 @@
 package com.linkedin.pinot.controller.validation;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.linkedin.pinot.common.config.SegmentsValidationAndRetentionConfig;
 import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.config.TableNameBuilder;
 import com.linkedin.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
-import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import com.linkedin.pinot.common.metrics.ValidationMetrics;
 import com.linkedin.pinot.common.utils.CommonConstants;
-import com.linkedin.pinot.common.utils.HLCSegmentName;
-import com.linkedin.pinot.common.utils.SegmentName;
 import com.linkedin.pinot.common.utils.time.TimeUtils;
 import com.linkedin.pinot.controller.ControllerConf;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import com.linkedin.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
-import com.linkedin.pinot.core.realtime.stream.StreamConfig;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.helix.model.InstanceConfig;
 import org.joda.time.Duration;
 import org.joda.time.Interval;
 import org.slf4j.Logger;
@@ -49,53 +40,26 @@ import org.slf4j.LoggerFactory;
  * Manages the segment validation metrics, to ensure that all offline segments are contiguous (no missing segments) and
  * that the offline push delay isn't too high.
  */
-public class ValidationManager extends ControllerPeriodicTask {
-  private static final Logger LOGGER = LoggerFactory.getLogger(ValidationManager.class);
+public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask {
+  private static final Logger LOGGER = LoggerFactory.getLogger(OfflineSegmentIntervalChecker.class);
 
-  private final int _segmentLevelValidationIntervalInSeconds;
-  private final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
-  private final ValidationMetrics _validationMetrics;
+  protected final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
+  protected final ValidationMetrics _validationMetrics;
 
-  private long _lastSegmentLevelValidationTimeMs = 0L;
-  private boolean _runSegmentLevelValidation;
-  private List<InstanceConfig> _instanceConfigs;
-
-  public ValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
+  public OfflineSegmentIntervalChecker(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
       PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, ValidationMetrics validationMetrics) {
-    super("ValidationManager", config.getValidationControllerFrequencyInSeconds(), pinotHelixResourceManager);
-    _segmentLevelValidationIntervalInSeconds = config.getSegmentLevelValidationIntervalInSeconds();
-    Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0);
+    super("OfflineSegmentIntervalChecker", config.getOfflineSegmentIntervalCheckerFrequencyInSeconds(),
+        pinotHelixResourceManager);
     _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
     _validationMetrics = validationMetrics;
   }
 
   @Override
   protected void preprocess() {
-    // Run segment level validation using a separate interval
-    _runSegmentLevelValidation = false;
-    long currentTimeMs = System.currentTimeMillis();
-    if (TimeUnit.MILLISECONDS.toSeconds(currentTimeMs - _lastSegmentLevelValidationTimeMs)
-        >= _segmentLevelValidationIntervalInSeconds) {
-      LOGGER.info("Run segment-level validation");
-      _runSegmentLevelValidation = true;
-      _lastSegmentLevelValidationTimeMs = currentTimeMs;
-    }
-
-    // Cache instance configs to reduce ZK access
-    _instanceConfigs = _pinotHelixResourceManager.getAllHelixInstanceConfigs();
   }
 
   @Override
   protected void processTable(String tableNameWithType) {
-    runValidation(tableNameWithType);
-  }
-
-  @Override
-  protected void postprocess() {
-
-  }
-
-  private void runValidation(String tableNameWithType) {
     try {
       TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
       if (tableConfig == null) {
@@ -103,29 +67,12 @@ public class ValidationManager extends ControllerPeriodicTask {
         return;
       }
 
-      // Rebuild broker resource
-      Set<String> brokerInstances = _pinotHelixResourceManager.getAllInstancesForBrokerTenant(_instanceConfigs,
-          tableConfig.getTenantConfig().getBroker());
-      _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType, brokerInstances);
-
-      // Perform validation based on the table type
       CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
       if (tableType == CommonConstants.Helix.TableType.OFFLINE) {
-        if (_runSegmentLevelValidation) {
-          validateOfflineSegmentPush(tableConfig);
-        }
-      } else {
-        if (_runSegmentLevelValidation) {
-          updateRealtimeDocumentCount(tableConfig);
-        }
-        Map<String, String> streamConfigMap = tableConfig.getIndexingConfig().getStreamConfigs();
-        StreamConfig streamConfig = new StreamConfig(streamConfigMap);
-        if (streamConfig.hasLowLevelConsumerType()) {
-          _llcRealtimeSegmentManager.validateLLCSegments(tableConfig);
-        }
+        validateOfflineSegmentPush(tableConfig);
       }
     } catch (Exception e) {
-      LOGGER.warn("Caught exception while validating table: {}", tableNameWithType, e);
+      LOGGER.warn("Caught exception while checking offline segment intervals for table: {}", tableNameWithType, e);
     }
   }
 
@@ -261,50 +208,9 @@ public class ValidationManager extends ControllerPeriodicTask {
     return numTotalDocs;
   }
 
-  private void updateRealtimeDocumentCount(TableConfig tableConfig) {
-    String realtimeTableName = tableConfig.getTableName();
-    List<RealtimeSegmentZKMetadata> metadataList =
-        _pinotHelixResourceManager.getRealtimeSegmentMetadata(realtimeTableName);
-    boolean countHLCSegments = true;  // false if this table has ONLY LLC segments (i.e. fully migrated)
-    StreamConfig streamConfig = new StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs());
-    if (streamConfig.hasLowLevelConsumerType() && !streamConfig.hasHighLevelConsumerType()) {
-      countHLCSegments = false;
-    }
-    // Update the gauge to contain the total document count in the segments
-    _validationMetrics.updateTotalDocumentCountGauge(tableConfig.getTableName(),
-        computeRealtimeTotalDocumentInSegments(metadataList, countHLCSegments));
-  }
-
-  @VisibleForTesting
-  static long computeRealtimeTotalDocumentInSegments(List<RealtimeSegmentZKMetadata> realtimeSegmentZKMetadataList,
-      boolean countHLCSegments) {
-    long numTotalDocs = 0;
-
-    String groupId = "";
-    for (RealtimeSegmentZKMetadata realtimeSegmentZKMetadata : realtimeSegmentZKMetadataList) {
-      String segmentName = realtimeSegmentZKMetadata.getSegmentName();
-      if (SegmentName.isHighLevelConsumerSegmentName(segmentName)) {
-        if (countHLCSegments) {
-          HLCSegmentName hlcSegmentName = new HLCSegmentName(segmentName);
-          String segmentGroupIdName = hlcSegmentName.getGroupId();
-
-          if (groupId.isEmpty()) {
-            groupId = segmentGroupIdName;
-          }
-          // Discard all segments with different groupids as they are replicas
-          if (groupId.equals(segmentGroupIdName) && realtimeSegmentZKMetadata.getTotalRawDocs() >= 0) {
-            numTotalDocs += realtimeSegmentZKMetadata.getTotalRawDocs();
-          }
-        }
-      } else {
-        // Low level segments
-        if (!countHLCSegments) {
-          numTotalDocs += realtimeSegmentZKMetadata.getTotalRawDocs();
-        }
-      }
-    }
+  @Override
+  protected void postprocess() {
 
-    return numTotalDocs;
   }
 
   @Override
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
new file mode 100644
index 0000000..71e50d2
--- /dev/null
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -0,0 +1,162 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.controller.validation;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.linkedin.pinot.common.config.TableConfig;
+import com.linkedin.pinot.common.config.TableNameBuilder;
+import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import com.linkedin.pinot.common.metrics.ValidationMetrics;
+import com.linkedin.pinot.common.utils.CommonConstants;
+import com.linkedin.pinot.common.utils.HLCSegmentName;
+import com.linkedin.pinot.common.utils.SegmentName;
+import com.linkedin.pinot.controller.ControllerConf;
+import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
+import com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import com.linkedin.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import com.linkedin.pinot.core.realtime.stream.StreamConfig;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Validates realtime ideal states and segment metadata, fixing any partitions which have stopped consuming
+ */
+public class RealtimeSegmentValidationManager extends ControllerPeriodicTask {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeSegmentValidationManager.class);
+
+  protected final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
+  protected final ValidationMetrics _validationMetrics;
+
+  private final int _segmentLevelValidationIntervalInSeconds;
+  private long _lastUpdateRealtimeDocumentCountTimeMs = 0L;
+  private boolean _updateRealtimeDocumentCount;
+
+  public RealtimeSegmentValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
+      PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, ValidationMetrics validationMetrics) {
+    super("RealtimeSegmentValidationManager", config.getRealtimeSegmentValidationFrequencyInSeconds(),
+        pinotHelixResourceManager);
+    _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
+    _validationMetrics = validationMetrics;
+
+    _segmentLevelValidationIntervalInSeconds = config.getSegmentLevelValidationIntervalInSeconds();
+    Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0);
+  }
+
+  @Override
+  protected void preprocess() {
+    // Update realtime document counts only if certain time has passed after previous run
+    _updateRealtimeDocumentCount = false;
+    long currentTimeMs = System.currentTimeMillis();
+    if (TimeUnit.MILLISECONDS.toSeconds(currentTimeMs - _lastUpdateRealtimeDocumentCountTimeMs)
+        >= _segmentLevelValidationIntervalInSeconds) {
+      LOGGER.info("Run segment-level validation");
+      _updateRealtimeDocumentCount = true;
+      _lastUpdateRealtimeDocumentCountTimeMs = currentTimeMs;
+    }
+  }
+
+  @Override
+  protected void processTable(String tableNameWithType) {
+    try {
+      TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}, skipping validation", tableNameWithType);
+        return;
+      }
+
+      CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == CommonConstants.Helix.TableType.REALTIME) {
+        if (_updateRealtimeDocumentCount) {
+          updateRealtimeDocumentCount(tableConfig);
+        }
+      }
+      Map<String, String> streamConfigMap = tableConfig.getIndexingConfig().getStreamConfigs();
+      StreamConfig streamConfig = new StreamConfig(streamConfigMap);
+      if (streamConfig.hasLowLevelConsumerType()) {
+        _llcRealtimeSegmentManager.validateLLCSegments(tableConfig);
+      }
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while validating realtime table: {}", tableNameWithType, e);
+    }
+  }
+
+  private void updateRealtimeDocumentCount(TableConfig tableConfig) {
+    String realtimeTableName = tableConfig.getTableName();
+    List<RealtimeSegmentZKMetadata> metadataList =
+        _pinotHelixResourceManager.getRealtimeSegmentMetadata(realtimeTableName);
+    boolean countHLCSegments = true;  // false if this table has ONLY LLC segments (i.e. fully migrated)
+    StreamConfig streamConfig = new StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs());
+    if (streamConfig.hasLowLevelConsumerType() && !streamConfig.hasHighLevelConsumerType()) {
+      countHLCSegments = false;
+    }
+    // Update the gauge to contain the total document count in the segments
+    _validationMetrics.updateTotalDocumentCountGauge(tableConfig.getTableName(),
+        computeRealtimeTotalDocumentInSegments(metadataList, countHLCSegments));
+  }
+
+  @VisibleForTesting
+  static long computeRealtimeTotalDocumentInSegments(List<RealtimeSegmentZKMetadata> realtimeSegmentZKMetadataList,
+      boolean countHLCSegments) {
+    long numTotalDocs = 0;
+
+    String groupId = "";
+    for (RealtimeSegmentZKMetadata realtimeSegmentZKMetadata : realtimeSegmentZKMetadataList) {
+      String segmentName = realtimeSegmentZKMetadata.getSegmentName();
+      if (SegmentName.isHighLevelConsumerSegmentName(segmentName)) {
+        if (countHLCSegments) {
+          HLCSegmentName hlcSegmentName = new HLCSegmentName(segmentName);
+          String segmentGroupIdName = hlcSegmentName.getGroupId();
+
+          if (groupId.isEmpty()) {
+            groupId = segmentGroupIdName;
+          }
+          // Discard all segments with different groupids as they are replicas
+          if (groupId.equals(segmentGroupIdName) && realtimeSegmentZKMetadata.getTotalRawDocs() >= 0) {
+            numTotalDocs += realtimeSegmentZKMetadata.getTotalRawDocs();
+          }
+        }
+      } else {
+        // Low level segments
+        if (!countHLCSegments) {
+          numTotalDocs += realtimeSegmentZKMetadata.getTotalRawDocs();
+        }
+      }
+    }
+
+    return numTotalDocs;
+  }
+
+  @Override
+  protected void postprocess() {
+
+  }
+
+  @Override
+  protected void initTask() {
+
+  }
+
+  @Override
+  public void stopTask() {
+    LOGGER.info("Unregister all the validation metrics.");
+    _validationMetrics.unregisterAllMetrics();
+  }
+}
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
index c27b0ef..6f384c7 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -23,6 +23,7 @@ import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import com.linkedin.pinot.common.segment.SegmentMetadata;
 import com.linkedin.pinot.common.utils.CommonConstants;
 import com.linkedin.pinot.common.utils.LLCSegmentName;
+import com.linkedin.pinot.controller.ControllerConf;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import com.linkedin.pinot.controller.helix.core.PinotTableIdealStateBuilder;
 import com.linkedin.pinot.controller.helix.core.SegmentDeletionManager;
@@ -84,7 +85,10 @@ public class RetentionManagerTest {
     when(pinotHelixResourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
     when(pinotHelixResourceManager.getOfflineSegmentMetadata(OFFLINE_TABLE_NAME)).thenReturn(metadataList);
 
-    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, 0, 0);
+    ControllerConf conf = new ControllerConf();
+    conf.setRetentionControllerFrequencyInSeconds(0);
+    conf.setDeletedSegmentsRetentionInDays(0);
+    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, conf);
     retentionManager.init();
     retentionManager.run();
 
@@ -201,7 +205,10 @@ public class RetentionManagerTest {
         setupSegmentMetadata(tableConfig, now, initialNumSegments, removedSegments);
     setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager);
 
-    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, 0, 0);
+    ControllerConf conf = new ControllerConf();
+    conf.setRetentionControllerFrequencyInSeconds(0);
+    conf.setDeletedSegmentsRetentionInDays(0);
+    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, conf);
     retentionManager.init();
     retentionManager.run();
 
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java
index 6d5394a..861623f 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java
@@ -49,7 +49,7 @@ import static org.testng.Assert.*;
 
 
 /**
- * Tests for the ValidationManager.
+ * Tests for the ValidationManagers.
  */
 public class ValidationManagerTest {
   private String HELIX_CLUSTER_NAME = "TestValidationManager";
@@ -175,7 +175,7 @@ public class ValidationManagerTest {
     segmentZKMetadataList.add(
         SegmentMetadataMockUtils.mockRealtimeSegmentZKMetadata(TEST_TABLE_NAME, segmentName4, 20));
 
-    assertEquals(ValidationManager.computeRealtimeTotalDocumentInSegments(segmentZKMetadataList, true), 60);
+    assertEquals(RealtimeSegmentValidationManager.computeRealtimeTotalDocumentInSegments(segmentZKMetadataList, true), 60);
 
     // Now add some low level segment names
     String segmentName5 = new LLCSegmentName(TEST_TABLE_NAME, 1, 0, 1000).getSegmentName();
@@ -185,7 +185,7 @@ public class ValidationManagerTest {
     segmentZKMetadataList.add(SegmentMetadataMockUtils.mockRealtimeSegmentZKMetadata(TEST_TABLE_NAME, segmentName6, 5));
 
     // Only the LLC segments should get counted.
-    assertEquals(ValidationManager.computeRealtimeTotalDocumentInSegments(segmentZKMetadataList, false), 15);
+    assertEquals(RealtimeSegmentValidationManager.computeRealtimeTotalDocumentInSegments(segmentZKMetadataList, false), 15);
   }
 
   @AfterClass
@@ -207,18 +207,18 @@ public class ValidationManagerTest {
     jan1st2nd3rd.add(jan1st);
     jan1st2nd3rd.add(jan2nd);
     jan1st2nd3rd.add(jan3rd);
-    assertEquals(ValidationManager.computeNumMissingSegments(jan1st2nd3rd, Duration.standardDays(1)), 0);
+    assertEquals(OfflineSegmentIntervalChecker.computeNumMissingSegments(jan1st2nd3rd, Duration.standardDays(1)), 0);
 
     ArrayList<Interval> jan1st2nd3rd5th = new ArrayList<>(jan1st2nd3rd);
     jan1st2nd3rd5th.add(jan5th);
-    assertEquals(ValidationManager.computeNumMissingSegments(jan1st2nd3rd5th, Duration.standardDays(1)), 1);
+    assertEquals(OfflineSegmentIntervalChecker.computeNumMissingSegments(jan1st2nd3rd5th, Duration.standardDays(1)), 1);
 
     // Should also work if the intervals are in random order
     ArrayList<Interval> jan5th2nd1st = new ArrayList<>();
     jan5th2nd1st.add(jan5th);
     jan5th2nd1st.add(jan2nd);
     jan5th2nd1st.add(jan1st);
-    assertEquals(ValidationManager.computeNumMissingSegments(jan5th2nd1st, Duration.standardDays(1)), 2);
+    assertEquals(OfflineSegmentIntervalChecker.computeNumMissingSegments(jan5th2nd1st, Duration.standardDays(1)), 2);
 
     // Should also work if the intervals are of different sizes
     Interval jan1stAnd2nd = new Interval(new DateTime(2015, 1, 1, 0, 0, 0), new DateTime(2015, 1, 2, 23, 59, 59));
@@ -226,6 +226,6 @@ public class ValidationManagerTest {
     jan1st2nd4th5th.add(jan1stAnd2nd);
     jan1st2nd4th5th.add(jan4th);
     jan1st2nd4th5th.add(jan5th);
-    assertEquals(ValidationManager.computeNumMissingSegments(jan1st2nd4th5th, Duration.standardDays(1)), 1);
+    assertEquals(OfflineSegmentIntervalChecker.computeNumMissingSegments(jan1st2nd4th5th, Duration.standardDays(1)), 1);
   }
 }
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SegmentCompletionIntegrationTests.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SegmentCompletionIntegrationTests.java
index e7fb540..ac4aad0 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SegmentCompletionIntegrationTests.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SegmentCompletionIntegrationTests.java
@@ -25,7 +25,7 @@ import com.linkedin.pinot.common.utils.LLCSegmentName;
 import com.linkedin.pinot.common.utils.NetUtil;
 import com.linkedin.pinot.common.utils.ZkStarter;
 import com.linkedin.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
-import com.linkedin.pinot.controller.validation.ValidationManager;
+import com.linkedin.pinot.controller.validation.RealtimeSegmentValidationManager;
 import com.linkedin.pinot.server.realtime.ControllerLeaderLocator;
 import com.linkedin.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
 import com.linkedin.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory;
@@ -152,7 +152,7 @@ public class SegmentCompletionIntegrationTests extends LLCRealtimeClusterIntegra
     final String oldSegment = _currentSegment;
 
     // Now call the validation manager, and the segment should fix itself
-    ValidationManager validationManager = _controllerStarter.getValidationManager();
+    RealtimeSegmentValidationManager validationManager = _controllerStarter.getRealtimeSegmentValidationManager();
     validationManager.init();
     validationManager.run();
 
diff --git a/pinot-tools/src/main/java/com/linkedin/pinot/tools/admin/command/StartControllerCommand.java b/pinot-tools/src/main/java/com/linkedin/pinot/tools/admin/command/StartControllerCommand.java
index e27e71a..3da214f 100644
--- a/pinot-tools/src/main/java/com/linkedin/pinot/tools/admin/command/StartControllerCommand.java
+++ b/pinot-tools/src/main/java/com/linkedin/pinot/tools/admin/command/StartControllerCommand.java
@@ -147,7 +147,9 @@ public class StartControllerCommand extends AbstractBaseAdminCommand implements
         conf.setTenantIsolationEnabled(_tenantIsolation);
 
         conf.setRetentionControllerFrequencyInSeconds(3600 * 6);
-        conf.setValidationControllerFrequencyInSeconds(3600);
+        conf.setOfflineSegmentIntervalCheckerFrequencyInSeconds(3600);
+        conf.setRealtimeSegmentValidationFrequencyInSeconds(3600);
+        conf.setBrokerResourceValidationFrequencyInSeconds(3600);
       }
 
       LOGGER.info("Executing command: " + toString());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org