You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2019/01/09 01:37:17 UTC

[incubator-pinot] branch master updated: Split validation manager tasks into separate periodic tasks (#3668)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 493ed64  Split validation manager tasks into separate periodic tasks (#3668)
493ed64 is described below

commit 493ed643e1e5a852cf2549cfe76ef4b3f1e7e0d4
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Tue Jan 8 17:37:12 2019 -0800

    Split validation manager tasks into separate periodic tasks (#3668)
    
    Split ValidationManager duties into separate ControllerPeriodicTasks viz. OfflineSegmentIntervalChecker (to perform checks related to offline segments), RealtimeSegmentValidationManager (to perform validation of realtime consuming partitions), BrokerResourceValidationManager (to perform validation of broker resource). These have been split out as they need to be run in different frequencies than each other.
---
 .../linkedin/pinot/controller/ControllerConf.java  | 161 ++++++++++++++------
 .../pinot/controller/ControllerStarter.java        |  40 +++--
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  10 +-
 .../helix/core/retention/RetentionManager.java     |  12 +-
 .../BrokerResourceValidationManager.java           |  82 ++++++++++
 ...ger.java => OfflineSegmentIntervalChecker.java} | 127 ++--------------
 .../RealtimeSegmentValidationManager.java          | 167 +++++++++++++++++++++
 .../PinotLLCRealtimeSegmentManagerTest.java        |  26 ++--
 .../helix/core/retention/RetentionManagerTest.java |  11 +-
 .../validation/ValidationManagerTest.java          |  14 +-
 .../tests/SegmentCompletionIntegrationTests.java   |   4 +-
 .../admin/command/StartControllerCommand.java      |   4 +-
 12 files changed, 459 insertions(+), 199 deletions(-)

diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
index df0c2df..863f3cc 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
@@ -54,15 +54,43 @@ public class ControllerConf extends PropertiesConfiguration {
   private static final String CONSOLE_WEBAPP_ROOT_PATH = "controller.query.console";
   private static final String CONSOLE_WEBAPP_USE_HTTPS = "controller.query.console.useHttps";
   private static final String EXTERNAL_VIEW_ONLINE_TO_OFFLINE_TIMEOUT = "controller.upload.onlineToOfflineTimeout";
-  private static final String RETENTION_MANAGER_FREQUENCY_IN_SECONDS = "controller.retention.frequencyInSeconds";
-  private static final String VALIDATION_MANAGER_FREQUENCY_IN_SECONDS = "controller.validation.frequencyInSeconds";
-  private static final String STATUS_CHECKER_FREQUENCY_IN_SECONDS = "controller.statuschecker.frequencyInSeconds";
-  private static final String REALTIME_SEGMENT_RELOCATOR_FREQUENCY = "controller.realtime.segment.relocator.frequency";
-  private static final String STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS = "controller.statuschecker.waitForPushTimeInSeconds";
+
+  public static class ControllerPeriodicTasksConf {
+    private static final String RETENTION_MANAGER_FREQUENCY_IN_SECONDS = "controller.retention.frequencyInSeconds";
+    @Deprecated // The ValidationManager has been split up into 3 separate tasks, each having their own frequency config settings
+    private static final String DEPRECATED_VALIDATION_MANAGER_FREQUENCY_IN_SECONDS =
+        "controller.validation.frequencyInSeconds";
+    private static final String OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS =
+        "controller.offline.segment.interval.checker.frequencyInSeconds";
+    private static final String REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS =
+        "controller.realtime.segment.validation.frequencyInSeconds";
+    private static final String BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS =
+        "controller.broker.resource.validation.frequencyInSeconds";
+    private static final String STATUS_CHECKER_FREQUENCY_IN_SECONDS = "controller.statuschecker.frequencyInSeconds";
+    private static final String STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS =
+        "controller.statuschecker.waitForPushTimeInSeconds";
+    private static final String TASK_MANAGER_FREQUENCY_IN_SECONDS = "controller.task.frequencyInSeconds";
+    private static final String REALTIME_SEGMENT_RELOCATOR_FREQUENCY =
+        "controller.realtime.segment.relocator.frequency";
+    // Because segment level validation is expensive and requires heavy ZK access, we run segment level validation with a
+    // separate interval
+    private static final String SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS =
+        "controller.segment.level.validation.intervalInSeconds";
+
+    private static final int DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours.
+    private static final int DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS = 24 * 60 * 60; // 24 Hours.
+    private static final int DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
+    private static final int DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
+    private static final int DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS = 5 * 60; // 5 minutes
+    private static final int DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS = 10 * 60; // 10 minutes
+    private static final int DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS = -1; // Disabled
+    private static final String DEFAULT_REALTIME_SEGMENT_RELOCATOR_FREQUENCY = "1h"; // 1 hour
+    private static final int DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS = 24 * 60 * 60;
+  }
+
   private static final String SERVER_ADMIN_REQUEST_TIMEOUT_SECONDS = "server.request.timeoutSeconds";
   private static final String SEGMENT_COMMIT_TIMEOUT_SECONDS = "controller.realtime.segment.commit.timeoutSeconds";
   private static final String DELETED_SEGMENTS_RETENTION_IN_DAYS = "controller.deleted.segments.retentionInDays";
-  private static final String TASK_MANAGER_FREQUENCY_IN_SECONDS = "controller.task.frequencyInSeconds";
   private static final String TABLE_MIN_REPLICAS = "table.minReplicas";
   private static final String ENABLE_SPLIT_COMMIT = "controller.enable.split.commit";
   private static final String JERSEY_ADMIN_API_PORT = "jersey.admin.api.port";
@@ -73,25 +101,17 @@ public class ControllerConf extends PropertiesConfiguration {
   private static final String SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS = "controller.segment.upload.timeoutInMillis";
   private static final String REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = "controller.realtime.segment.metadata.commit.numLocks";
   private static final String ENABLE_STORAGE_QUOTA_CHECK = "controller.enable.storage.quota.check";
-  // Because segment level validation is expensive and requires heavy ZK access, we run segment level validation with a
-  // separate interval
-  private static final String SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS =
-      "controller.segment.level.validation.intervalInSeconds";
+
   private static final String ENABLE_BATCH_MESSAGE_MODE = "controller.enable.batch.message.mode";
 
   // Defines the kind of storage and the underlying PinotFS implementation
   private static final String PINOT_FS_FACTORY_CLASS_PREFIX = "controller.storage.factory.class";
   private static final String PINOT_FS_FACTORY_CLASS_LOCAL = "controller.storage.factory.class.file";
 
-  private static final int DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours.
-  private static final int DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
-  private static final int DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS = 5 * 60; // 5 minutes
-  private static final String DEFAULT_REALTIME_SEGMENT_RELOCATOR_FREQUENCY = "1h"; // 1 hour
-  private static final int DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS = 10 * 60; // 10 minutes
+
   private static final long DEFAULT_EXTERNAL_VIEW_ONLINE_TO_OFFLINE_TIMEOUT_MILLIS = 120_000L; // 2 minutes
   private static final int DEFAULT_SERVER_ADMIN_REQUEST_TIMEOUT_SECONDS = 30;
   private static final int DEFAULT_DELETED_SEGMENTS_RETENTION_IN_DAYS = 7;
-  private static final int DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS = -1; // Disabled
   private static final int DEFAULT_TABLE_MIN_REPLICAS = 1;
   private static final boolean DEFAULT_ENABLE_SPLIT_COMMIT = false;
   private static final int DEFAULT_JERSEY_ADMIN_PORT = 21000;
@@ -101,7 +121,6 @@ public class ControllerConf extends PropertiesConfiguration {
   private static final int DEFAULT_REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = 64;
   private static final boolean DEFAULT_ENABLE_STORAGE_QUOTA_CHECK = true;
   private static final boolean DEFAULT_ENABLE_BATCH_MESSAGE_MODE = true;
-  private static final int DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS = 24 * 60 * 60;
 
   private static final String DEFAULT_PINOT_FS_FACTORY_CLASS_LOCAL = LocalPinotFS.class.getName();
 
@@ -324,58 +343,114 @@ public class ControllerConf extends PropertiesConfiguration {
   }
 
   public int getRetentionControllerFrequencyInSeconds() {
-    if (containsKey(RETENTION_MANAGER_FREQUENCY_IN_SECONDS)) {
-      return Integer.parseInt((String) getProperty(RETENTION_MANAGER_FREQUENCY_IN_SECONDS));
+    if (containsKey(ControllerPeriodicTasksConf.RETENTION_MANAGER_FREQUENCY_IN_SECONDS)) {
+      return Integer.parseInt((String) getProperty(ControllerPeriodicTasksConf.RETENTION_MANAGER_FREQUENCY_IN_SECONDS));
     }
-    return DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS;
+    return ControllerPeriodicTasksConf.DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS;
   }
 
   public void setRetentionControllerFrequencyInSeconds(int retentionFrequencyInSeconds) {
-    setProperty(RETENTION_MANAGER_FREQUENCY_IN_SECONDS, Integer.toString(retentionFrequencyInSeconds));
+    setProperty(ControllerPeriodicTasksConf.RETENTION_MANAGER_FREQUENCY_IN_SECONDS,
+        Integer.toString(retentionFrequencyInSeconds));
   }
 
-  public int getValidationControllerFrequencyInSeconds() {
-    if (containsKey(VALIDATION_MANAGER_FREQUENCY_IN_SECONDS)) {
-      return Integer.parseInt((String) getProperty(VALIDATION_MANAGER_FREQUENCY_IN_SECONDS));
+  /**
+   * Returns the config value for controller.offline.segment.interval.checker.frequencyInSeconds if it exists.
+   * If it doesn't exist, returns the segment level validation interval. This is done in order to retain the current behavior,
+   * wherein the offline validation tasks were done at segment level validation interval frequency
+   * The default value is the new DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS
+   * @return
+   */
+  public int getOfflineSegmentIntervalCheckerFrequencyInSeconds() {
+    if (containsKey(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS)) {
+      return Integer.parseInt(
+          (String) getProperty(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS));
+    }
+    return getInt(ControllerPeriodicTasksConf.SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS,
+        ControllerPeriodicTasksConf.DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS);
+  }
+
+  public void setOfflineSegmentIntervalCheckerFrequencyInSeconds(int validationFrequencyInSeconds) {
+    setProperty(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS,
+        Integer.toString(validationFrequencyInSeconds));
+  }
+
+  /**
+   * Returns the config value for controller.realtime.segment.validation.frequencyInSeconds if it exists.
+   * If it doesn't exist, returns the validation controller frequency. This is done in order to retain the current behavior,
+   * wherein the realtime validation tasks were done at validation controller frequency
+   * The default value is the new DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS
+   * @return
+   */
+  public int getRealtimeSegmentValidationFrequencyInSeconds() {
+    if (containsKey(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS)) {
+      return Integer.parseInt(
+          (String) getProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS));
+    }
+    return getInt(ControllerPeriodicTasksConf.DEPRECATED_VALIDATION_MANAGER_FREQUENCY_IN_SECONDS,
+        ControllerPeriodicTasksConf.DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS);
+  }
+
+  public void setRealtimeSegmentValidationFrequencyInSeconds(int validationFrequencyInSeconds) {
+    setProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS,
+        Integer.toString(validationFrequencyInSeconds));
+  }
+
+  /**
+   * Returns the config value for  controller.broker.resource.validation.frequencyInSeconds if it exists.
+   * If it doesn't exist, returns the validation controller frequency. This is done in order to retain the current behavior,
+   * wherein the broker resource validation tasks were done at validation controller frequency
+   * The default value is the new DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS
+   * @return
+   */
+  public int getBrokerResourceValidationFrequencyInSeconds() {
+    if (containsKey(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS)) {
+      return Integer.parseInt(
+          (String) getProperty(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS));
     }
-    return DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS;
+    return getInt(ControllerPeriodicTasksConf.DEPRECATED_VALIDATION_MANAGER_FREQUENCY_IN_SECONDS,
+        ControllerPeriodicTasksConf.DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS);
   }
 
-  public void setValidationControllerFrequencyInSeconds(int validationFrequencyInSeconds) {
-    setProperty(VALIDATION_MANAGER_FREQUENCY_IN_SECONDS, Integer.toString(validationFrequencyInSeconds));
+  public void setBrokerResourceValidationFrequencyInSeconds(int validationFrequencyInSeconds) {
+    setProperty(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS,
+        Integer.toString(validationFrequencyInSeconds));
   }
 
   public int getStatusCheckerFrequencyInSeconds() {
-    if (containsKey(STATUS_CHECKER_FREQUENCY_IN_SECONDS)) {
-      return Integer.parseInt((String) getProperty(STATUS_CHECKER_FREQUENCY_IN_SECONDS));
+    if (containsKey(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_IN_SECONDS)) {
+      return Integer.parseInt((String) getProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_IN_SECONDS));
     }
-    return DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS;
+    return ControllerPeriodicTasksConf.DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS;
   }
 
   public void setStatusCheckerFrequencyInSeconds(int statusCheckerFrequencyInSeconds) {
-    setProperty(STATUS_CHECKER_FREQUENCY_IN_SECONDS, Integer.toString(statusCheckerFrequencyInSeconds));
+    setProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_IN_SECONDS,
+        Integer.toString(statusCheckerFrequencyInSeconds));
   }
 
   public String getRealtimeSegmentRelocatorFrequency() {
-    if (containsKey(REALTIME_SEGMENT_RELOCATOR_FREQUENCY)) {
-      return (String) getProperty(REALTIME_SEGMENT_RELOCATOR_FREQUENCY);
+    if (containsKey(ControllerPeriodicTasksConf.REALTIME_SEGMENT_RELOCATOR_FREQUENCY)) {
+      return (String) getProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_RELOCATOR_FREQUENCY);
     }
-    return DEFAULT_REALTIME_SEGMENT_RELOCATOR_FREQUENCY;
+    return ControllerPeriodicTasksConf.DEFAULT_REALTIME_SEGMENT_RELOCATOR_FREQUENCY;
   }
 
   public void setRealtimeSegmentRelocatorFrequency(String relocatorFrequency) {
-    setProperty(REALTIME_SEGMENT_RELOCATOR_FREQUENCY, relocatorFrequency);
+    setProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_RELOCATOR_FREQUENCY, relocatorFrequency);
   }
 
   public int getStatusCheckerWaitForPushTimeInSeconds() {
-    if (containsKey(STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS)) {
-      return Integer.parseInt((String) getProperty(STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS));
+    if (containsKey(ControllerPeriodicTasksConf.STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS)) {
+      return Integer.parseInt(
+          (String) getProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS));
     }
-    return DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS;
+    return ControllerPeriodicTasksConf.DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS;
   }
 
   public void setStatusCheckerWaitForPushTimeInSeconds(int statusCheckerWaitForPushTimeInSeconds) {
-    setProperty(STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS, Integer.toString(statusCheckerWaitForPushTimeInSeconds));
+    setProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS,
+        Integer.toString(statusCheckerWaitForPushTimeInSeconds));
   }
 
   public long getExternalViewOnlineToOfflineTimeout() {
@@ -417,11 +492,12 @@ public class ControllerConf extends PropertiesConfiguration {
   }
 
   public int getTaskManagerFrequencyInSeconds() {
-    return getInt(TASK_MANAGER_FREQUENCY_IN_SECONDS, DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS);
+    return getInt(ControllerPeriodicTasksConf.TASK_MANAGER_FREQUENCY_IN_SECONDS,
+        ControllerPeriodicTasksConf.DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS);
   }
 
   public void setTaskManagerFrequencyInSeconds(int frequencyInSeconds) {
-    setProperty(TASK_MANAGER_FREQUENCY_IN_SECONDS, Integer.toString(frequencyInSeconds));
+    setProperty(ControllerPeriodicTasksConf.TASK_MANAGER_FREQUENCY_IN_SECONDS, Integer.toString(frequencyInSeconds));
   }
 
   public int getDefaultTableMinReplicas() {
@@ -469,6 +545,7 @@ public class ControllerConf extends PropertiesConfiguration {
   }
 
   public int getSegmentLevelValidationIntervalInSeconds() {
-    return getInt(SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS, DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS);
+    return getInt(ControllerPeriodicTasksConf.SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS,
+        ControllerPeriodicTasksConf.DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS);
   }
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
index 2799b44..2bca8ae 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
@@ -41,7 +41,9 @@ import com.linkedin.pinot.controller.helix.core.realtime.PinotRealtimeSegmentMan
 import com.linkedin.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategyFactory;
 import com.linkedin.pinot.controller.helix.core.relocation.RealtimeSegmentRelocator;
 import com.linkedin.pinot.controller.helix.core.retention.RetentionManager;
-import com.linkedin.pinot.controller.validation.ValidationManager;
+import com.linkedin.pinot.controller.validation.BrokerResourceValidationManager;
+import com.linkedin.pinot.controller.validation.OfflineSegmentIntervalChecker;
+import com.linkedin.pinot.controller.validation.RealtimeSegmentValidationManager;
 import com.linkedin.pinot.core.crypt.PinotCrypterFactory;
 import com.linkedin.pinot.core.periodictask.PeriodicTask;
 import com.linkedin.pinot.filesystem.PinotFSFactory;
@@ -86,7 +88,9 @@ public class ControllerStarter {
   private final ControllerPeriodicTaskScheduler _controllerPeriodicTaskScheduler;
 
   // Can only be constructed after resource manager getting started
-  private ValidationManager _validationManager;
+  private OfflineSegmentIntervalChecker _offlineSegmentIntervalChecker;
+  private RealtimeSegmentValidationManager _realtimeSegmentValidationManager;
+  private BrokerResourceValidationManager _brokerResourceValidationManager;
   private RealtimeSegmentRelocator _realtimeSegmentRelocator;
   private PinotHelixTaskResourceManager _helixTaskResourceManager;
   private PinotTaskManager _taskManager;
@@ -95,8 +99,7 @@ public class ControllerStarter {
     _config = conf;
     _adminApp = new ControllerAdminApiApplication(_config.getQueryConsoleWebappPath(), _config.getQueryConsoleUseHttps());
     _helixResourceManager = new PinotHelixResourceManager(_config);
-    _retentionManager = new RetentionManager(_helixResourceManager, _config.getRetentionControllerFrequencyInSeconds(),
-        _config.getDeletedSegmentsRetentionInDays());
+    _retentionManager = new RetentionManager(_helixResourceManager, _config);
     _metricsRegistry = new MetricsRegistry();
     _controllerMetrics = new ControllerMetrics(_metricsRegistry);
     _realtimeSegmentsManager = new PinotRealtimeSegmentManager(_helixResourceManager);
@@ -111,8 +114,16 @@ public class ControllerStarter {
     return _helixResourceManager;
   }
 
-  public ValidationManager getValidationManager() {
-    return _validationManager;
+  public OfflineSegmentIntervalChecker getOfflineSegmentIntervalChecker() {
+    return _offlineSegmentIntervalChecker;
+  }
+
+  public RealtimeSegmentValidationManager getRealtimeSegmentValidationManager() {
+    return _realtimeSegmentValidationManager;
+  }
+
+  public BrokerResourceValidationManager getBrokerResourceValidationManager() {
+    return _brokerResourceValidationManager;
   }
 
   public PinotHelixTaskResourceManager getHelixTaskResourceManager() {
@@ -181,10 +192,17 @@ public class ControllerStarter {
     _taskManager = new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, _config, _controllerMetrics);
     periodicTasks.add(_taskManager);
     periodicTasks.add(_retentionManager);
-    _validationManager =
-        new ValidationManager(_config, _helixResourceManager, PinotLLCRealtimeSegmentManager.getInstance(),
+    _offlineSegmentIntervalChecker =
+        new OfflineSegmentIntervalChecker(_config, _helixResourceManager, new ValidationMetrics(_metricsRegistry));
+    _realtimeSegmentValidationManager =
+        new RealtimeSegmentValidationManager(_config, _helixResourceManager, PinotLLCRealtimeSegmentManager.getInstance(),
             new ValidationMetrics(_metricsRegistry));
-    periodicTasks.add(_validationManager);
+    _brokerResourceValidationManager =
+        new BrokerResourceValidationManager(_config, _helixResourceManager);
+
+    periodicTasks.add(_offlineSegmentIntervalChecker);
+    periodicTasks.add(_realtimeSegmentValidationManager);
+    periodicTasks.add(_brokerResourceValidationManager);
     periodicTasks.add(_segmentStatusChecker);
     periodicTasks.add(_realtimeSegmentRelocator);
 
@@ -351,7 +369,9 @@ public class ControllerStarter {
     conf.setControllerVipHost("localhost");
     conf.setControllerVipProtocol("http");
     conf.setRetentionControllerFrequencyInSeconds(3600 * 6);
-    conf.setValidationControllerFrequencyInSeconds(3600);
+    conf.setOfflineSegmentIntervalCheckerFrequencyInSeconds(3600);
+    conf.setRealtimeSegmentValidationFrequencyInSeconds(3600);
+    conf.setBrokerResourceValidationFrequencyInSeconds(3600);
     conf.setStatusCheckerFrequencyInSeconds(5 * 60);
     conf.setRealtimeSegmentRelocatorFrequency("1h");
     conf.setStatusCheckerWaitForPushTimeInSeconds(10 * 60);
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 9860ec1..a77750e 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -818,8 +818,8 @@ public class PinotLLCRealtimeSegmentManager {
   /**
    * An instance is reporting that it has stopped consuming a topic due to some error.
    * Mark the state of the segment to be OFFLINE in idealstate.
-   * When all replicas of this segment are marked offline, the ValidationManager, in its next
-   * run, will auto-create a new segment with the appropriate offset.
+   * When all replicas of this segment are marked offline, the {@link com.linkedin.pinot.controller.validation.RealtimeSegmentValidationManager},
+   * in its next run, will auto-create a new segment with the appropriate offset.
    */
   public void segmentStoppedConsuming(final LLCSegmentName segmentName, final String instance) {
     String rawTableName = segmentName.getTableName();
@@ -950,7 +950,7 @@ public class PinotLLCRealtimeSegmentManager {
    *
    * TODO: We need to find a place to detect and update a gauge for nonConsumingPartitionsCount for a table, and reset it to 0 at the end of validateLLC
    */
-  public void validateLLCSegments(final TableConfig tableConfig) {
+  public void ensureAllPartitionsConsuming(final TableConfig tableConfig) {
     final String tableNameWithType = tableConfig.getTableName();
     final StreamConfig streamConfig = new StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs());
     final int partitionCount = getPartitionCount(streamConfig);
@@ -958,7 +958,7 @@ public class PinotLLCRealtimeSegmentManager {
       @Nullable
       @Override
       public IdealState apply(@Nullable IdealState idealState) {
-        return validateLLCSegments(tableConfig, idealState, partitionCount);
+        return ensureAllPartitionsConsuming(tableConfig, idealState, partitionCount);
       }
     }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f), true);
   }
@@ -1078,7 +1078,7 @@ public class PinotLLCRealtimeSegmentManager {
    * TODO: split this method into multiple smaller methods
    */
   @VisibleForTesting
-  protected IdealState validateLLCSegments(final TableConfig tableConfig, IdealState idealState,
+  protected IdealState ensureAllPartitionsConsuming(final TableConfig tableConfig, IdealState idealState,
       final int partitionCount) {
     final String tableNameWithType = tableConfig.getTableName();
     final StreamConfig streamConfig = new StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs());
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
index f177a80..726c481 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
@@ -26,6 +26,7 @@ import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import com.linkedin.pinot.common.utils.CommonConstants;
 import com.linkedin.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
 import com.linkedin.pinot.common.utils.SegmentName;
+import com.linkedin.pinot.controller.ControllerConf;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import com.linkedin.pinot.controller.helix.core.retention.strategy.RetentionStrategy;
@@ -52,10 +53,9 @@ public class RetentionManager extends ControllerPeriodicTask {
 
   private final int _deletedSegmentsRetentionInDays;
 
-  public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager, int runFrequencyInSeconds,
-      int deletedSegmentsRetentionInDays) {
-    super("RetentionManager", runFrequencyInSeconds, pinotHelixResourceManager);
-    _deletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays;
+  public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config) {
+    super("RetentionManager", config.getRetentionControllerFrequencyInSeconds(), pinotHelixResourceManager);
+    _deletedSegmentsRetentionInDays = config.getDeletedSegmentsRetentionInDays();
 
     LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}, deletedSegmentsRetentionInDays: {}",
         getIntervalInSeconds(), _deletedSegmentsRetentionInDays);
@@ -148,7 +148,7 @@ public class RetentionManager extends ControllerPeriodicTask {
         // In progress segment, only check LLC segment
         if (SegmentName.isLowLevelConsumerSegmentName(segmentName)) {
           // Delete old LLC segment that hangs around. Do not delete segment that are current since there may be a race
-          // with ValidationManager trying to auto-create the LLC segment
+          // with RealtimeSegmentValidationManager trying to auto-create the LLC segment
           if (shouldDeleteInProgressLLCSegment(segmentName, idealState, realtimeSegmentZKMetadata)) {
             segmentsToDelete.add(segmentName);
           }
@@ -172,7 +172,7 @@ public class RetentionManager extends ControllerPeriodicTask {
       return false;
     }
     // delete a segment only if it is old enough (5 days) or else,
-    // 1. latest segment could get deleted in the middle of repair by ValidationManager
+    // 1. latest segment could get deleted in the middle of repair by RealtimeSegmentValidationManager
     // 2. for a brand new segment, if this code kicks in after new metadata is created but ideal state entry is not yet created (between step 2 and 3),
     // the latest segment metadata could get marked for deletion
     if (System.currentTimeMillis() - realtimeSegmentZKMetadata.getCreationTime()
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/BrokerResourceValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/BrokerResourceValidationManager.java
new file mode 100644
index 0000000..40078d6
--- /dev/null
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/BrokerResourceValidationManager.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.linkedin.pinot.controller.validation;
+
+import com.linkedin.pinot.common.config.TableConfig;
+import com.linkedin.pinot.controller.ControllerConf;
+import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
+import com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import java.util.List;
+import java.util.Set;
+import org.apache.helix.model.InstanceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Rebuilds the broker resource if the instance set has changed
+ */
+public class BrokerResourceValidationManager extends ControllerPeriodicTask {
+  private static final Logger LOGGER = LoggerFactory.getLogger(BrokerResourceValidationManager.class);
+
+  private List<InstanceConfig> _instanceConfigs;
+
+  public BrokerResourceValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager) {
+    super("BrokerResourceValidationManager", config.getBrokerResourceValidationFrequencyInSeconds(),
+        pinotHelixResourceManager);
+  }
+
+  @Override
+  protected void preprocess() {
+    _instanceConfigs = _pinotHelixResourceManager.getAllHelixInstanceConfigs();
+  }
+
+  @Override
+  protected void processTable(String tableNameWithType) {
+    try {
+      TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}, skipping broker resource validation", tableNameWithType);
+        return;
+      }
+
+      // Rebuild broker resource
+      Set<String> brokerInstances = _pinotHelixResourceManager.getAllInstancesForBrokerTenant(_instanceConfigs,
+          tableConfig.getTenantConfig().getBroker());
+      _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType, brokerInstances);
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while validating broker resource for table: {}", tableNameWithType, e);
+    }
+  }
+
+
+  @Override
+  protected void postprocess() {
+
+  }
+
+  @Override
+  protected void initTask() {
+
+  }
+
+  @Override
+  public void stopTask() {
+  }
+}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java
similarity index 62%
rename from pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
rename to pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index c5528e7..3c79205 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -19,29 +19,19 @@
 package com.linkedin.pinot.controller.validation;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.linkedin.pinot.common.config.SegmentsValidationAndRetentionConfig;
 import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.config.TableNameBuilder;
 import com.linkedin.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
-import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import com.linkedin.pinot.common.metrics.ValidationMetrics;
 import com.linkedin.pinot.common.utils.CommonConstants;
-import com.linkedin.pinot.common.utils.HLCSegmentName;
-import com.linkedin.pinot.common.utils.SegmentName;
 import com.linkedin.pinot.common.utils.time.TimeUtils;
 import com.linkedin.pinot.controller.ControllerConf;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
-import com.linkedin.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
-import com.linkedin.pinot.core.realtime.stream.StreamConfig;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.helix.model.InstanceConfig;
 import org.joda.time.Duration;
 import org.joda.time.Interval;
 import org.slf4j.Logger;
@@ -52,83 +42,39 @@ import org.slf4j.LoggerFactory;
  * Manages the segment validation metrics, to ensure that all offline segments are contiguous (no missing segments) and
  * that the offline push delay isn't too high.
  */
-public class ValidationManager extends ControllerPeriodicTask {
-  private static final Logger LOGGER = LoggerFactory.getLogger(ValidationManager.class);
+public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask {
+  private static final Logger LOGGER = LoggerFactory.getLogger(OfflineSegmentIntervalChecker.class);
 
-  private final int _segmentLevelValidationIntervalInSeconds;
-  private final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
   private final ValidationMetrics _validationMetrics;
 
-  private long _lastSegmentLevelValidationTimeMs = 0L;
-  private boolean _runSegmentLevelValidation;
-  private List<InstanceConfig> _instanceConfigs;
-
-  public ValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
-      PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, ValidationMetrics validationMetrics) {
-    super("ValidationManager", config.getValidationControllerFrequencyInSeconds(), pinotHelixResourceManager);
-    _segmentLevelValidationIntervalInSeconds = config.getSegmentLevelValidationIntervalInSeconds();
-    Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0);
-    _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
+  public OfflineSegmentIntervalChecker(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
+      ValidationMetrics validationMetrics) {
+    super("OfflineSegmentIntervalChecker", config.getOfflineSegmentIntervalCheckerFrequencyInSeconds(),
+        pinotHelixResourceManager);
     _validationMetrics = validationMetrics;
   }
 
   @Override
   protected void preprocess() {
-    // Run segment level validation using a separate interval
-    _runSegmentLevelValidation = false;
-    long currentTimeMs = System.currentTimeMillis();
-    if (TimeUnit.MILLISECONDS.toSeconds(currentTimeMs - _lastSegmentLevelValidationTimeMs)
-        >= _segmentLevelValidationIntervalInSeconds) {
-      LOGGER.info("Run segment-level validation");
-      _runSegmentLevelValidation = true;
-      _lastSegmentLevelValidationTimeMs = currentTimeMs;
-    }
-
-    // Cache instance configs to reduce ZK access
-    _instanceConfigs = _pinotHelixResourceManager.getAllHelixInstanceConfigs();
   }
 
   @Override
   protected void processTable(String tableNameWithType) {
-    runValidation(tableNameWithType);
-  }
-
-  @Override
-  protected void postprocess() {
-
-  }
-
-  private void runValidation(String tableNameWithType) {
     try {
-      TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
-      if (tableConfig == null) {
-        LOGGER.warn("Failed to find table config for table: {}, skipping validation", tableNameWithType);
-        return;
-      }
-
-      // Rebuild broker resource
-      Set<String> brokerInstances = _pinotHelixResourceManager.getAllInstancesForBrokerTenant(_instanceConfigs,
-          tableConfig.getTenantConfig().getBroker());
-      _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType, brokerInstances);
 
-      // Perform validation based on the table type
       CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
       if (tableType == CommonConstants.Helix.TableType.OFFLINE) {
-        if (_runSegmentLevelValidation) {
-          validateOfflineSegmentPush(tableConfig);
-        }
-      } else {
-        if (_runSegmentLevelValidation) {
-          updateRealtimeDocumentCount(tableConfig);
-        }
-        Map<String, String> streamConfigMap = tableConfig.getIndexingConfig().getStreamConfigs();
-        StreamConfig streamConfig = new StreamConfig(streamConfigMap);
-        if (streamConfig.hasLowLevelConsumerType()) {
-          _llcRealtimeSegmentManager.validateLLCSegments(tableConfig);
+
+        TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+        if (tableConfig == null) {
+          LOGGER.warn("Failed to find table config for table: {}, skipping validation", tableNameWithType);
+          return;
         }
+
+        validateOfflineSegmentPush(tableConfig);
       }
     } catch (Exception e) {
-      LOGGER.warn("Caught exception while validating table: {}", tableNameWithType, e);
+      LOGGER.warn("Caught exception while checking offline segment intervals for table: {}", tableNameWithType, e);
     }
   }
 
@@ -264,50 +210,9 @@ public class ValidationManager extends ControllerPeriodicTask {
     return numTotalDocs;
   }
 
-  private void updateRealtimeDocumentCount(TableConfig tableConfig) {
-    String realtimeTableName = tableConfig.getTableName();
-    List<RealtimeSegmentZKMetadata> metadataList =
-        _pinotHelixResourceManager.getRealtimeSegmentMetadata(realtimeTableName);
-    boolean countHLCSegments = true;  // false if this table has ONLY LLC segments (i.e. fully migrated)
-    StreamConfig streamConfig = new StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs());
-    if (streamConfig.hasLowLevelConsumerType() && !streamConfig.hasHighLevelConsumerType()) {
-      countHLCSegments = false;
-    }
-    // Update the gauge to contain the total document count in the segments
-    _validationMetrics.updateTotalDocumentCountGauge(tableConfig.getTableName(),
-        computeRealtimeTotalDocumentInSegments(metadataList, countHLCSegments));
-  }
-
-  @VisibleForTesting
-  static long computeRealtimeTotalDocumentInSegments(List<RealtimeSegmentZKMetadata> realtimeSegmentZKMetadataList,
-      boolean countHLCSegments) {
-    long numTotalDocs = 0;
-
-    String groupId = "";
-    for (RealtimeSegmentZKMetadata realtimeSegmentZKMetadata : realtimeSegmentZKMetadataList) {
-      String segmentName = realtimeSegmentZKMetadata.getSegmentName();
-      if (SegmentName.isHighLevelConsumerSegmentName(segmentName)) {
-        if (countHLCSegments) {
-          HLCSegmentName hlcSegmentName = new HLCSegmentName(segmentName);
-          String segmentGroupIdName = hlcSegmentName.getGroupId();
-
-          if (groupId.isEmpty()) {
-            groupId = segmentGroupIdName;
-          }
-          // Discard all segments with different groupids as they are replicas
-          if (groupId.equals(segmentGroupIdName) && realtimeSegmentZKMetadata.getTotalRawDocs() >= 0) {
-            numTotalDocs += realtimeSegmentZKMetadata.getTotalRawDocs();
-          }
-        }
-      } else {
-        // Low level segments
-        if (!countHLCSegments) {
-          numTotalDocs += realtimeSegmentZKMetadata.getTotalRawDocs();
-        }
-      }
-    }
+  @Override
+  protected void postprocess() {
 
-    return numTotalDocs;
   }
 
   @Override
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
new file mode 100644
index 0000000..3274df3
--- /dev/null
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.linkedin.pinot.controller.validation;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.linkedin.pinot.common.config.TableConfig;
+import com.linkedin.pinot.common.config.TableNameBuilder;
+import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import com.linkedin.pinot.common.metrics.ValidationMetrics;
+import com.linkedin.pinot.common.utils.CommonConstants;
+import com.linkedin.pinot.common.utils.HLCSegmentName;
+import com.linkedin.pinot.common.utils.SegmentName;
+import com.linkedin.pinot.controller.ControllerConf;
+import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
+import com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import com.linkedin.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import com.linkedin.pinot.core.realtime.stream.StreamConfig;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Validates realtime ideal states and segment metadata, fixing any partitions which have stopped consuming
+ */
+public class RealtimeSegmentValidationManager extends ControllerPeriodicTask {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeSegmentValidationManager.class);
+
+  private final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
+  private final ValidationMetrics _validationMetrics;
+
+  private final int _segmentLevelValidationIntervalInSeconds;
+  private long _lastUpdateRealtimeDocumentCountTimeMs = 0L;
+  private boolean _updateRealtimeDocumentCount;
+
+  public RealtimeSegmentValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
+      PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, ValidationMetrics validationMetrics) {
+    super("RealtimeSegmentValidationManager", config.getRealtimeSegmentValidationFrequencyInSeconds(),
+        pinotHelixResourceManager);
+    _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
+    _validationMetrics = validationMetrics;
+
+    _segmentLevelValidationIntervalInSeconds = config.getSegmentLevelValidationIntervalInSeconds();
+    Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0);
+  }
+
+  @Override
+  protected void preprocess() {
+    // Update realtime document counts only if certain time has passed after previous run
+    _updateRealtimeDocumentCount = false;
+    long currentTimeMs = System.currentTimeMillis();
+    if (TimeUnit.MILLISECONDS.toSeconds(currentTimeMs - _lastUpdateRealtimeDocumentCountTimeMs)
+        >= _segmentLevelValidationIntervalInSeconds) {
+      LOGGER.info("Run segment-level validation");
+      _updateRealtimeDocumentCount = true;
+      _lastUpdateRealtimeDocumentCountTimeMs = currentTimeMs;
+    }
+  }
+
+  @Override
+  protected void processTable(String tableNameWithType) {
+    try {
+      CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == CommonConstants.Helix.TableType.REALTIME) {
+
+        TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+        if (tableConfig == null) {
+          LOGGER.warn("Failed to find table config for table: {}, skipping validation", tableNameWithType);
+          return;
+        }
+
+        if (_updateRealtimeDocumentCount) {
+          updateRealtimeDocumentCount(tableConfig);
+        }
+
+        Map<String, String> streamConfigMap = tableConfig.getIndexingConfig().getStreamConfigs();
+        StreamConfig streamConfig = new StreamConfig(streamConfigMap);
+        if (streamConfig.hasLowLevelConsumerType()) {
+          _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig);
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while validating realtime table: {}", tableNameWithType, e);
+    }
+  }
+
+  private void updateRealtimeDocumentCount(TableConfig tableConfig) {
+    String realtimeTableName = tableConfig.getTableName();
+    List<RealtimeSegmentZKMetadata> metadataList =
+        _pinotHelixResourceManager.getRealtimeSegmentMetadata(realtimeTableName);
+    boolean countHLCSegments = true;  // false if this table has ONLY LLC segments (i.e. fully migrated)
+    StreamConfig streamConfig = new StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs());
+    if (streamConfig.hasLowLevelConsumerType() && !streamConfig.hasHighLevelConsumerType()) {
+      countHLCSegments = false;
+    }
+    // Update the gauge to contain the total document count in the segments
+    _validationMetrics.updateTotalDocumentCountGauge(tableConfig.getTableName(),
+        computeRealtimeTotalDocumentInSegments(metadataList, countHLCSegments));
+  }
+
+  @VisibleForTesting
+  static long computeRealtimeTotalDocumentInSegments(List<RealtimeSegmentZKMetadata> realtimeSegmentZKMetadataList,
+      boolean countHLCSegments) {
+    long numTotalDocs = 0;
+
+    String groupId = "";
+    for (RealtimeSegmentZKMetadata realtimeSegmentZKMetadata : realtimeSegmentZKMetadataList) {
+      String segmentName = realtimeSegmentZKMetadata.getSegmentName();
+      if (SegmentName.isHighLevelConsumerSegmentName(segmentName)) {
+        if (countHLCSegments) {
+          HLCSegmentName hlcSegmentName = new HLCSegmentName(segmentName);
+          String segmentGroupIdName = hlcSegmentName.getGroupId();
+
+          if (groupId.isEmpty()) {
+            groupId = segmentGroupIdName;
+          }
+          // Discard all segments with different groupids as they are replicas
+          if (groupId.equals(segmentGroupIdName) && realtimeSegmentZKMetadata.getTotalRawDocs() >= 0) {
+            numTotalDocs += realtimeSegmentZKMetadata.getTotalRawDocs();
+          }
+        }
+      } else {
+        // Low level segments
+        if (!countHLCSegments) {
+          numTotalDocs += realtimeSegmentZKMetadata.getTotalRawDocs();
+        }
+      }
+    }
+
+    return numTotalDocs;
+  }
+
+  @Override
+  protected void postprocess() {
+
+  }
+
+  @Override
+  protected void initTask() {
+
+  }
+
+  @Override
+  public void stopTask() {
+    LOGGER.info("Unregister all the validation metrics.");
+    _validationMetrics.unregisterAllMetrics();
+  }
+}
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 36f3386..fd417a5 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -369,7 +369,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
       oldMetadataMap.put(entry.getKey(), new LLCRealtimeSegmentZKMetadata(entry.getValue().toZNRecord()));
     }
     segmentManager._partitionAssignmentGenerator.setConsumingInstances(instances);
-    IdealState updatedIdealState = segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+    IdealState updatedIdealState = segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
     Map<String, Map<String, String>> updatedMapFields = updatedIdealState.getRecord().getMapFields();
     Map<String, LLCRealtimeSegmentZKMetadata> updatedMetadataMap = segmentManager._metadataMap;
 
@@ -525,13 +525,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
           Map<String, Map<String, String>> oldMapFields = idealStateCopy.getRecord().getMapFields();
 
           if (tooSoonToCorrect) {
-            segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+            segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
             // validate that all entries in oldMapFields are unchanged in new ideal state
             verifyNoChangeToOldEntries(oldMapFields, idealState);
             segmentManager.tooSoonToCorrect = false;
           }
 
-          segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+          segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
 
           // verify that new segment gets created in ideal state with CONSUMING
           Assert.assertNotNull(idealState.getRecord().getMapFields().get(llcSegmentName.getSegmentName()));
@@ -572,13 +572,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
           Map<String, Map<String, String>> oldMapFields = idealStateCopy.getRecord().getMapFields();
 
           if (tooSoonToCorrect) {
-            segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+            segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
             // validate nothing changed and try again with disabled
             verifyNoChangeToOldEntries(oldMapFields, idealState);
             segmentManager.tooSoonToCorrect = false;
           }
 
-          segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+          segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
 
           // verify that new segment gets created in ideal state with CONSUMING and old segment ONLINE
           Assert.assertNotNull(idealState.getRecord().getMapFields().get(latestSegment.getSegmentName()).values().
@@ -615,7 +615,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
               (ZNRecord) znRecordSerializer.deserialize(znRecordSerializer.serialize(idealState.getRecord())));
           Map<String, Map<String, String>> oldMapFields = idealStateCopy.getRecord().getMapFields();
 
-          segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+          segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
 
           verifyRepairs(tableConfig, idealState, expectedPartitionAssignment, segmentManager, oldMapFields);
         } else {
@@ -644,12 +644,12 @@ public class PinotLLCRealtimeSegmentManagerTest {
             Map<String, Map<String, String>> oldMapFields = idealStateCopy.getRecord().getMapFields();
 
             if (tooSoonToCorrect) {
-              segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+              segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
               // validate nothing changed and try again with disabled
               verifyNoChangeToOldEntries(oldMapFields, idealState);
               segmentManager.tooSoonToCorrect = false;
             }
-            segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+            segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
 
             verifyRepairs(tableConfig, idealState, expectedPartitionAssignment, segmentManager, oldMapFields);
           } else {
@@ -677,13 +677,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
               Map<String, Map<String, String>> oldMapFields = idealStateCopy.getRecord().getMapFields();
 
               if (tooSoonToCorrect) {
-                segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+                segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
                 // validate nothing changed and try again with disabled
                 verifyNoChangeToOldEntries(oldMapFields, idealState);
                 segmentManager.tooSoonToCorrect = false;
               }
 
-              segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+              segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
 
               verifyRepairs(tableConfig, idealState, expectedPartitionAssignment, segmentManager, oldMapFields);
             } else {
@@ -695,7 +695,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
                   (ZNRecord) znRecordSerializer.deserialize(znRecordSerializer.serialize(idealState.getRecord())));
               Map<String, Map<String, String>> oldMapFields = idealStateCopy.getRecord().getMapFields();
 
-              segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+              segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
 
               // verify that nothing changed
               verifyNoChangeToOldEntries(oldMapFields, idealState);
@@ -830,7 +830,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     IdealState idealState = idealStateBuilder.clear().build();
     segmentManager._metadataMap.clear();
 
-    segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+    segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
     PartitionAssignment partitionAssignment =
         segmentManager._partitionAssignmentGenerator.getStreamPartitionAssignmentFromIdealState(tableConfig,
             idealState);
@@ -845,7 +845,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
       FakePinotLLCRealtimeSegmentManager segmentManager, TableConfig tableConfig, int nPartitions) {
     IdealState idealState = idealStateBuilder.clear().build();
     segmentManager._metadataMap.clear();
-    segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+    segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
     return idealStateBuilder.build();
   }
 
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
index 3e60350..d2fe9b3 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -26,6 +26,7 @@ import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import com.linkedin.pinot.common.segment.SegmentMetadata;
 import com.linkedin.pinot.common.utils.CommonConstants;
 import com.linkedin.pinot.common.utils.LLCSegmentName;
+import com.linkedin.pinot.controller.ControllerConf;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import com.linkedin.pinot.controller.helix.core.PinotTableIdealStateBuilder;
 import com.linkedin.pinot.controller.helix.core.SegmentDeletionManager;
@@ -87,7 +88,10 @@ public class RetentionManagerTest {
     when(pinotHelixResourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
     when(pinotHelixResourceManager.getOfflineSegmentMetadata(OFFLINE_TABLE_NAME)).thenReturn(metadataList);
 
-    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, 0, 0);
+    ControllerConf conf = new ControllerConf();
+    conf.setRetentionControllerFrequencyInSeconds(0);
+    conf.setDeletedSegmentsRetentionInDays(0);
+    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, conf);
     retentionManager.init();
     retentionManager.run();
 
@@ -204,7 +208,10 @@ public class RetentionManagerTest {
         setupSegmentMetadata(tableConfig, now, initialNumSegments, removedSegments);
     setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager);
 
-    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, 0, 0);
+    ControllerConf conf = new ControllerConf();
+    conf.setRetentionControllerFrequencyInSeconds(0);
+    conf.setDeletedSegmentsRetentionInDays(0);
+    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, conf);
     retentionManager.init();
     retentionManager.run();
 
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java
index 0526afb..f69657b 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java
@@ -52,7 +52,7 @@ import static org.testng.Assert.*;
 
 
 /**
- * Tests for the ValidationManager.
+ * Tests for the ValidationManagers.
  */
 public class ValidationManagerTest {
   private String HELIX_CLUSTER_NAME = "TestValidationManager";
@@ -178,7 +178,7 @@ public class ValidationManagerTest {
     segmentZKMetadataList.add(
         SegmentMetadataMockUtils.mockRealtimeSegmentZKMetadata(TEST_TABLE_NAME, segmentName4, 20));
 
-    assertEquals(ValidationManager.computeRealtimeTotalDocumentInSegments(segmentZKMetadataList, true), 60);
+    assertEquals(RealtimeSegmentValidationManager.computeRealtimeTotalDocumentInSegments(segmentZKMetadataList, true), 60);
 
     // Now add some low level segment names
     String segmentName5 = new LLCSegmentName(TEST_TABLE_NAME, 1, 0, 1000).getSegmentName();
@@ -188,7 +188,7 @@ public class ValidationManagerTest {
     segmentZKMetadataList.add(SegmentMetadataMockUtils.mockRealtimeSegmentZKMetadata(TEST_TABLE_NAME, segmentName6, 5));
 
     // Only the LLC segments should get counted.
-    assertEquals(ValidationManager.computeRealtimeTotalDocumentInSegments(segmentZKMetadataList, false), 15);
+    assertEquals(RealtimeSegmentValidationManager.computeRealtimeTotalDocumentInSegments(segmentZKMetadataList, false), 15);
   }
 
   @AfterClass
@@ -210,18 +210,18 @@ public class ValidationManagerTest {
     jan1st2nd3rd.add(jan1st);
     jan1st2nd3rd.add(jan2nd);
     jan1st2nd3rd.add(jan3rd);
-    assertEquals(ValidationManager.computeNumMissingSegments(jan1st2nd3rd, Duration.standardDays(1)), 0);
+    assertEquals(OfflineSegmentIntervalChecker.computeNumMissingSegments(jan1st2nd3rd, Duration.standardDays(1)), 0);
 
     ArrayList<Interval> jan1st2nd3rd5th = new ArrayList<>(jan1st2nd3rd);
     jan1st2nd3rd5th.add(jan5th);
-    assertEquals(ValidationManager.computeNumMissingSegments(jan1st2nd3rd5th, Duration.standardDays(1)), 1);
+    assertEquals(OfflineSegmentIntervalChecker.computeNumMissingSegments(jan1st2nd3rd5th, Duration.standardDays(1)), 1);
 
     // Should also work if the intervals are in random order
     ArrayList<Interval> jan5th2nd1st = new ArrayList<>();
     jan5th2nd1st.add(jan5th);
     jan5th2nd1st.add(jan2nd);
     jan5th2nd1st.add(jan1st);
-    assertEquals(ValidationManager.computeNumMissingSegments(jan5th2nd1st, Duration.standardDays(1)), 2);
+    assertEquals(OfflineSegmentIntervalChecker.computeNumMissingSegments(jan5th2nd1st, Duration.standardDays(1)), 2);
 
     // Should also work if the intervals are of different sizes
     Interval jan1stAnd2nd = new Interval(new DateTime(2015, 1, 1, 0, 0, 0), new DateTime(2015, 1, 2, 23, 59, 59));
@@ -229,6 +229,6 @@ public class ValidationManagerTest {
     jan1st2nd4th5th.add(jan1stAnd2nd);
     jan1st2nd4th5th.add(jan4th);
     jan1st2nd4th5th.add(jan5th);
-    assertEquals(ValidationManager.computeNumMissingSegments(jan1st2nd4th5th, Duration.standardDays(1)), 1);
+    assertEquals(OfflineSegmentIntervalChecker.computeNumMissingSegments(jan1st2nd4th5th, Duration.standardDays(1)), 1);
   }
 }
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SegmentCompletionIntegrationTests.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SegmentCompletionIntegrationTests.java
index 28eb463..5ef1278 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SegmentCompletionIntegrationTests.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SegmentCompletionIntegrationTests.java
@@ -28,7 +28,7 @@ import com.linkedin.pinot.common.utils.LLCSegmentName;
 import com.linkedin.pinot.common.utils.NetUtil;
 import com.linkedin.pinot.common.utils.ZkStarter;
 import com.linkedin.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
-import com.linkedin.pinot.controller.validation.ValidationManager;
+import com.linkedin.pinot.controller.validation.RealtimeSegmentValidationManager;
 import com.linkedin.pinot.server.realtime.ControllerLeaderLocator;
 import com.linkedin.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
 import com.linkedin.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory;
@@ -155,7 +155,7 @@ public class SegmentCompletionIntegrationTests extends LLCRealtimeClusterIntegra
     final String oldSegment = _currentSegment;
 
     // Now call the validation manager, and the segment should fix itself
-    ValidationManager validationManager = _controllerStarter.getValidationManager();
+    RealtimeSegmentValidationManager validationManager = _controllerStarter.getRealtimeSegmentValidationManager();
     validationManager.init();
     validationManager.run();
 
diff --git a/pinot-tools/src/main/java/com/linkedin/pinot/tools/admin/command/StartControllerCommand.java b/pinot-tools/src/main/java/com/linkedin/pinot/tools/admin/command/StartControllerCommand.java
index 49c56b1..aa82182 100644
--- a/pinot-tools/src/main/java/com/linkedin/pinot/tools/admin/command/StartControllerCommand.java
+++ b/pinot-tools/src/main/java/com/linkedin/pinot/tools/admin/command/StartControllerCommand.java
@@ -150,7 +150,9 @@ public class StartControllerCommand extends AbstractBaseAdminCommand implements
         conf.setTenantIsolationEnabled(_tenantIsolation);
 
         conf.setRetentionControllerFrequencyInSeconds(3600 * 6);
-        conf.setValidationControllerFrequencyInSeconds(3600);
+        conf.setOfflineSegmentIntervalCheckerFrequencyInSeconds(3600);
+        conf.setRealtimeSegmentValidationFrequencyInSeconds(3600);
+        conf.setBrokerResourceValidationFrequencyInSeconds(3600);
       }
 
       LOGGER.info("Executing command: " + toString());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org