You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2019/01/08 23:23:23 UTC

[incubator-pinot] branch split_vm_tasks_2 created (now 96255c9)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a change to branch split_vm_tasks_2
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 96255c9  Change license header

This branch includes the following new commits:

     new f7b13ea  Split ValidationManager duties into separate ControllerPeriodicTasks
     new f509558  Change access modifier of some fields
     new 1e58eed  Move some code inside the if check for table type
     new 0ff4723  Review comments
     new 7525914  Use controller validation frequency as default values for new periodic validation tasks
     new 7f624ee  Change DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS to 24 hours, to match current default behavior (segment level validation interval)
     new 01fdea1  Using new defaults for each periodic task
     new 96255c9  Change license header

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 08/08: Change license header

Posted by ne...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch split_vm_tasks_2
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 96255c9da02f914078bb47e981fccdc1ac3c4833
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Tue Jan 8 15:21:27 2019 -0800

    Change license header
---
 .../BrokerResourceValidationManager.java           | 25 ++++++++++++----------
 .../RealtimeSegmentValidationManager.java          | 25 ++++++++++++----------
 2 files changed, 28 insertions(+), 22 deletions(-)

diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/BrokerResourceValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/BrokerResourceValidationManager.java
index 1f09b64..40078d6 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/BrokerResourceValidationManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/BrokerResourceValidationManager.java
@@ -1,17 +1,20 @@
 /**
- * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package com.linkedin.pinot.controller.validation;
 
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 77ed13a..3274df3 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -1,17 +1,20 @@
 /**
- * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package com.linkedin.pinot.controller.validation;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 03/08: Move some code inside the if check for table type

Posted by ne...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch split_vm_tasks_2
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 1e58eed1e89e91e3b95272415863f4127f796346
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Mon Jan 7 09:56:29 2019 -0800

    Move some code inside the if check for table type
---
 .../validation/OfflineSegmentIntervalChecker.java  | 12 ++++++-----
 .../RealtimeSegmentValidationManager.java          | 24 ++++++++++++----------
 2 files changed, 20 insertions(+), 16 deletions(-)

diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index 8695353..592bd0a 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -64,14 +64,16 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask {
   @Override
   protected void processTable(String tableNameWithType) {
     try {
-      TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
-      if (tableConfig == null) {
-        LOGGER.warn("Failed to find table config for table: {}, skipping validation", tableNameWithType);
-        return;
-      }
 
       CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
       if (tableType == CommonConstants.Helix.TableType.OFFLINE) {
+
+        TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+        if (tableConfig == null) {
+          LOGGER.warn("Failed to find table config for table: {}, skipping validation", tableNameWithType);
+          return;
+        }
+
         validateOfflineSegmentPush(tableConfig);
       }
     } catch (Exception e) {
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 42208e7..36d7b27 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -76,22 +76,24 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask {
   @Override
   protected void processTable(String tableNameWithType) {
     try {
-      TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
-      if (tableConfig == null) {
-        LOGGER.warn("Failed to find table config for table: {}, skipping validation", tableNameWithType);
-        return;
-      }
-
       CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
       if (tableType == CommonConstants.Helix.TableType.REALTIME) {
+
+        TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+        if (tableConfig == null) {
+          LOGGER.warn("Failed to find table config for table: {}, skipping validation", tableNameWithType);
+          return;
+        }
+
         if (_updateRealtimeDocumentCount) {
           updateRealtimeDocumentCount(tableConfig);
         }
-      }
-      Map<String, String> streamConfigMap = tableConfig.getIndexingConfig().getStreamConfigs();
-      StreamConfig streamConfig = new StreamConfig(streamConfigMap);
-      if (streamConfig.hasLowLevelConsumerType()) {
-        _llcRealtimeSegmentManager.validateLLCSegments(tableConfig);
+
+        Map<String, String> streamConfigMap = tableConfig.getIndexingConfig().getStreamConfigs();
+        StreamConfig streamConfig = new StreamConfig(streamConfigMap);
+        if (streamConfig.hasLowLevelConsumerType()) {
+          _llcRealtimeSegmentManager.validateLLCSegments(tableConfig);
+        }
       }
     } catch (Exception e) {
       LOGGER.warn("Caught exception while validating realtime table: {}", tableNameWithType, e);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 04/08: Review comments

Posted by ne...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch split_vm_tasks_2
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 0ff4723735ed8c8cd7146d69cf10a2c716b3cd89
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Mon Jan 7 10:49:35 2019 -0800

    Review comments
---
 .../linkedin/pinot/controller/ControllerConf.java  | 31 +++++++++++++++++++++-
 .../pinot/controller/ControllerStarter.java        |  3 +--
 .../BrokerResourceValidationManager.java           |  1 -
 .../validation/OfflineSegmentIntervalChecker.java  |  5 +---
 4 files changed, 32 insertions(+), 8 deletions(-)

diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
index 1b496d9..9f84691 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
@@ -57,6 +57,7 @@ public class ControllerConf extends PropertiesConfiguration {
 
   public static class ControllerPeriodicTasksConf {
     private static final String RETENTION_MANAGER_FREQUENCY_IN_SECONDS = "controller.retention.frequencyInSeconds";
+    private static final String VALIDATION_MANAGER_FREQUENCY_IN_SECONDS = "controller.validation.frequencyInSeconds";
     private static final String OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS =
         "controller.offline.segment.interval.checker.frequencyInSeconds";
     private static final String REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS =
@@ -75,6 +76,7 @@ public class ControllerConf extends PropertiesConfiguration {
         "controller.segment.level.validation.intervalInSeconds";
 
     private static final int DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours.
+    private static final int DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
     private static final int DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours.
     private static final int DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
     private static final int DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
@@ -351,6 +353,28 @@ public class ControllerConf extends PropertiesConfiguration {
         Integer.toString(retentionFrequencyInSeconds));
   }
 
+  /**
+   * Deprecated. The validation manager has been split into 3 separate tasks, each having their own frequency config
+   * @return
+   */
+  @Deprecated
+  public int getValidationControllerFrequencyInSeconds() {
+    if (containsKey(ControllerPeriodicTasksConf.VALIDATION_MANAGER_FREQUENCY_IN_SECONDS)) {
+      return Integer.parseInt(
+          (String) getProperty(ControllerPeriodicTasksConf.VALIDATION_MANAGER_FREQUENCY_IN_SECONDS));
+    }
+    return ControllerPeriodicTasksConf.DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS;
+  }
+
+  /**
+   * Deprecated. The validation manager has been split into 3 separate tasks, each having their own frequency config
+   * @return
+   */
+  public void setValidationControllerFrequencyInSeconds(int validationFrequencyInSeconds) {
+    setProperty(ControllerPeriodicTasksConf.VALIDATION_MANAGER_FREQUENCY_IN_SECONDS,
+        Integer.toString(validationFrequencyInSeconds));
+  }
+
   public int getOfflineSegmentIntervalCheckerFrequencyInSeconds() {
     if (containsKey(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS)) {
       return Integer.parseInt(
@@ -378,12 +402,17 @@ public class ControllerConf extends PropertiesConfiguration {
         Integer.toString(validationFrequencyInSeconds));
   }
 
+  /**
+   * Return broker resource validation frequency if present, else return the validation manager frequency
+   * This is so that we can rollout with no config changes to the frequency of this task
+   * @return
+   */
   public int getBrokerResourceValidationFrequencyInSeconds() {
     if (containsKey(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS)) {
       return Integer.parseInt(
           (String) getProperty(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS));
     }
-    return ControllerPeriodicTasksConf.DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS;
+    return getValidationControllerFrequencyInSeconds();
   }
 
   public void setBrokerResourceValidationFrequencyInSeconds(int validationFrequencyInSeconds) {
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
index 07ca195..2bca8ae 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
@@ -193,8 +193,7 @@ public class ControllerStarter {
     periodicTasks.add(_taskManager);
     periodicTasks.add(_retentionManager);
     _offlineSegmentIntervalChecker =
-        new OfflineSegmentIntervalChecker(_config, _helixResourceManager, PinotLLCRealtimeSegmentManager.getInstance(),
-            new ValidationMetrics(_metricsRegistry));
+        new OfflineSegmentIntervalChecker(_config, _helixResourceManager, new ValidationMetrics(_metricsRegistry));
     _realtimeSegmentValidationManager =
         new RealtimeSegmentValidationManager(_config, _helixResourceManager, PinotLLCRealtimeSegmentManager.getInstance(),
             new ValidationMetrics(_metricsRegistry));
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/BrokerResourceValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/BrokerResourceValidationManager.java
index e3264c4..1f09b64 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/BrokerResourceValidationManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/BrokerResourceValidationManager.java
@@ -41,7 +41,6 @@ public class BrokerResourceValidationManager extends ControllerPeriodicTask {
 
   @Override
   protected void preprocess() {
-    // Cache instance configs to reduce ZK access
     _instanceConfigs = _pinotHelixResourceManager.getAllHelixInstanceConfigs();
   }
 
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index 592bd0a..3c79205 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -29,7 +29,6 @@ import com.linkedin.pinot.common.utils.time.TimeUtils;
 import com.linkedin.pinot.controller.ControllerConf;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
-import com.linkedin.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.commons.lang3.StringUtils;
@@ -46,14 +45,12 @@ import org.slf4j.LoggerFactory;
 public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask {
   private static final Logger LOGGER = LoggerFactory.getLogger(OfflineSegmentIntervalChecker.class);
 
-  private final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
   private final ValidationMetrics _validationMetrics;
 
   public OfflineSegmentIntervalChecker(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
-      PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, ValidationMetrics validationMetrics) {
+      ValidationMetrics validationMetrics) {
     super("OfflineSegmentIntervalChecker", config.getOfflineSegmentIntervalCheckerFrequencyInSeconds(),
         pinotHelixResourceManager);
-    _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
     _validationMetrics = validationMetrics;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/08: Split ValidationManager duties into separate ControllerPeriodicTasks

Posted by ne...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch split_vm_tasks_2
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit f7b13ea1a9b70ce604dca9dcd78891ba7b4e9f57
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Fri Jan 4 15:50:52 2019 -0800

    Split ValidationManager duties into separate ControllerPeriodicTasks
---
 .../linkedin/pinot/controller/ControllerConf.java  | 135 +++++++++++------
 .../pinot/controller/ControllerStarter.java        |  41 ++++--
 .../realtime/PinotLLCRealtimeSegmentManager.java   |   4 +-
 .../helix/core/retention/RetentionManager.java     |  12 +-
 .../BrokerResourceValidationManager.java           |  80 ++++++++++
 ...ger.java => OfflineSegmentIntervalChecker.java} | 116 ++-------------
 .../RealtimeSegmentValidationManager.java          | 162 +++++++++++++++++++++
 .../helix/core/retention/RetentionManagerTest.java |  11 +-
 .../validation/ValidationManagerTest.java          |  14 +-
 .../tests/SegmentCompletionIntegrationTests.java   |   4 +-
 .../admin/command/StartControllerCommand.java      |   4 +-
 11 files changed, 406 insertions(+), 177 deletions(-)

diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
index df0c2df..1b496d9 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
@@ -54,15 +54,40 @@ public class ControllerConf extends PropertiesConfiguration {
   private static final String CONSOLE_WEBAPP_ROOT_PATH = "controller.query.console";
   private static final String CONSOLE_WEBAPP_USE_HTTPS = "controller.query.console.useHttps";
   private static final String EXTERNAL_VIEW_ONLINE_TO_OFFLINE_TIMEOUT = "controller.upload.onlineToOfflineTimeout";
-  private static final String RETENTION_MANAGER_FREQUENCY_IN_SECONDS = "controller.retention.frequencyInSeconds";
-  private static final String VALIDATION_MANAGER_FREQUENCY_IN_SECONDS = "controller.validation.frequencyInSeconds";
-  private static final String STATUS_CHECKER_FREQUENCY_IN_SECONDS = "controller.statuschecker.frequencyInSeconds";
-  private static final String REALTIME_SEGMENT_RELOCATOR_FREQUENCY = "controller.realtime.segment.relocator.frequency";
-  private static final String STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS = "controller.statuschecker.waitForPushTimeInSeconds";
+
+  public static class ControllerPeriodicTasksConf {
+    private static final String RETENTION_MANAGER_FREQUENCY_IN_SECONDS = "controller.retention.frequencyInSeconds";
+    private static final String OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS =
+        "controller.offline.segment.interval.checker.frequencyInSeconds";
+    private static final String REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS =
+        "controller.realtime.segment.validation.frequencyInSeconds";
+    private static final String BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS =
+        "controller.broker.resource.validation.frequencyInSeconds";
+    private static final String STATUS_CHECKER_FREQUENCY_IN_SECONDS = "controller.statuschecker.frequencyInSeconds";
+    private static final String STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS =
+        "controller.statuschecker.waitForPushTimeInSeconds";
+    private static final String TASK_MANAGER_FREQUENCY_IN_SECONDS = "controller.task.frequencyInSeconds";
+    private static final String REALTIME_SEGMENT_RELOCATOR_FREQUENCY =
+        "controller.realtime.segment.relocator.frequency";
+    // Because segment level validation is expensive and requires heavy ZK access, we run segment level validation with a
+    // separate interval
+    private static final String SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS =
+        "controller.segment.level.validation.intervalInSeconds";
+
+    private static final int DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours.
+    private static final int DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours.
+    private static final int DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
+    private static final int DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
+    private static final int DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS = 5 * 60; // 5 minutes
+    private static final int DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS = 10 * 60; // 10 minutes
+    private static final int DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS = -1; // Disabled
+    private static final String DEFAULT_REALTIME_SEGMENT_RELOCATOR_FREQUENCY = "1h"; // 1 hour
+    private static final int DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS = 24 * 60 * 60;
+  }
+
   private static final String SERVER_ADMIN_REQUEST_TIMEOUT_SECONDS = "server.request.timeoutSeconds";
   private static final String SEGMENT_COMMIT_TIMEOUT_SECONDS = "controller.realtime.segment.commit.timeoutSeconds";
   private static final String DELETED_SEGMENTS_RETENTION_IN_DAYS = "controller.deleted.segments.retentionInDays";
-  private static final String TASK_MANAGER_FREQUENCY_IN_SECONDS = "controller.task.frequencyInSeconds";
   private static final String TABLE_MIN_REPLICAS = "table.minReplicas";
   private static final String ENABLE_SPLIT_COMMIT = "controller.enable.split.commit";
   private static final String JERSEY_ADMIN_API_PORT = "jersey.admin.api.port";
@@ -73,25 +98,17 @@ public class ControllerConf extends PropertiesConfiguration {
   private static final String SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS = "controller.segment.upload.timeoutInMillis";
   private static final String REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = "controller.realtime.segment.metadata.commit.numLocks";
   private static final String ENABLE_STORAGE_QUOTA_CHECK = "controller.enable.storage.quota.check";
-  // Because segment level validation is expensive and requires heavy ZK access, we run segment level validation with a
-  // separate interval
-  private static final String SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS =
-      "controller.segment.level.validation.intervalInSeconds";
+
   private static final String ENABLE_BATCH_MESSAGE_MODE = "controller.enable.batch.message.mode";
 
   // Defines the kind of storage and the underlying PinotFS implementation
   private static final String PINOT_FS_FACTORY_CLASS_PREFIX = "controller.storage.factory.class";
   private static final String PINOT_FS_FACTORY_CLASS_LOCAL = "controller.storage.factory.class.file";
 
-  private static final int DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours.
-  private static final int DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
-  private static final int DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS = 5 * 60; // 5 minutes
-  private static final String DEFAULT_REALTIME_SEGMENT_RELOCATOR_FREQUENCY = "1h"; // 1 hour
-  private static final int DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS = 10 * 60; // 10 minutes
+
   private static final long DEFAULT_EXTERNAL_VIEW_ONLINE_TO_OFFLINE_TIMEOUT_MILLIS = 120_000L; // 2 minutes
   private static final int DEFAULT_SERVER_ADMIN_REQUEST_TIMEOUT_SECONDS = 30;
   private static final int DEFAULT_DELETED_SEGMENTS_RETENTION_IN_DAYS = 7;
-  private static final int DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS = -1; // Disabled
   private static final int DEFAULT_TABLE_MIN_REPLICAS = 1;
   private static final boolean DEFAULT_ENABLE_SPLIT_COMMIT = false;
   private static final int DEFAULT_JERSEY_ADMIN_PORT = 21000;
@@ -101,7 +118,6 @@ public class ControllerConf extends PropertiesConfiguration {
   private static final int DEFAULT_REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = 64;
   private static final boolean DEFAULT_ENABLE_STORAGE_QUOTA_CHECK = true;
   private static final boolean DEFAULT_ENABLE_BATCH_MESSAGE_MODE = true;
-  private static final int DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS = 24 * 60 * 60;
 
   private static final String DEFAULT_PINOT_FS_FACTORY_CLASS_LOCAL = LocalPinotFS.class.getName();
 
@@ -324,58 +340,91 @@ public class ControllerConf extends PropertiesConfiguration {
   }
 
   public int getRetentionControllerFrequencyInSeconds() {
-    if (containsKey(RETENTION_MANAGER_FREQUENCY_IN_SECONDS)) {
-      return Integer.parseInt((String) getProperty(RETENTION_MANAGER_FREQUENCY_IN_SECONDS));
+    if (containsKey(ControllerPeriodicTasksConf.RETENTION_MANAGER_FREQUENCY_IN_SECONDS)) {
+      return Integer.parseInt((String) getProperty(ControllerPeriodicTasksConf.RETENTION_MANAGER_FREQUENCY_IN_SECONDS));
     }
-    return DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS;
+    return ControllerPeriodicTasksConf.DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS;
   }
 
   public void setRetentionControllerFrequencyInSeconds(int retentionFrequencyInSeconds) {
-    setProperty(RETENTION_MANAGER_FREQUENCY_IN_SECONDS, Integer.toString(retentionFrequencyInSeconds));
+    setProperty(ControllerPeriodicTasksConf.RETENTION_MANAGER_FREQUENCY_IN_SECONDS,
+        Integer.toString(retentionFrequencyInSeconds));
+  }
+
+  public int getOfflineSegmentIntervalCheckerFrequencyInSeconds() {
+    if (containsKey(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS)) {
+      return Integer.parseInt(
+          (String) getProperty(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS));
+    }
+    return ControllerPeriodicTasksConf.DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS;
+  }
+
+  public void setOfflineSegmentIntervalCheckerFrequencyInSeconds(int validationFrequencyInSeconds) {
+    setProperty(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS,
+        Integer.toString(validationFrequencyInSeconds));
+  }
+
+  public int getRealtimeSegmentValidationFrequencyInSeconds() {
+
+    if (containsKey(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS)) {
+      return Integer.parseInt(
+          (String) getProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS));
+    }
+    return ControllerPeriodicTasksConf.DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS;
+  }
+
+  public void setRealtimeSegmentValidationFrequencyInSeconds(int validationFrequencyInSeconds) {
+    setProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS,
+        Integer.toString(validationFrequencyInSeconds));
   }
 
-  public int getValidationControllerFrequencyInSeconds() {
-    if (containsKey(VALIDATION_MANAGER_FREQUENCY_IN_SECONDS)) {
-      return Integer.parseInt((String) getProperty(VALIDATION_MANAGER_FREQUENCY_IN_SECONDS));
+  public int getBrokerResourceValidationFrequencyInSeconds() {
+    if (containsKey(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS)) {
+      return Integer.parseInt(
+          (String) getProperty(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS));
     }
-    return DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS;
+    return ControllerPeriodicTasksConf.DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS;
   }
 
-  public void setValidationControllerFrequencyInSeconds(int validationFrequencyInSeconds) {
-    setProperty(VALIDATION_MANAGER_FREQUENCY_IN_SECONDS, Integer.toString(validationFrequencyInSeconds));
+  public void setBrokerResourceValidationFrequencyInSeconds(int validationFrequencyInSeconds) {
+    setProperty(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS,
+        Integer.toString(validationFrequencyInSeconds));
   }
 
   public int getStatusCheckerFrequencyInSeconds() {
-    if (containsKey(STATUS_CHECKER_FREQUENCY_IN_SECONDS)) {
-      return Integer.parseInt((String) getProperty(STATUS_CHECKER_FREQUENCY_IN_SECONDS));
+    if (containsKey(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_IN_SECONDS)) {
+      return Integer.parseInt((String) getProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_IN_SECONDS));
     }
-    return DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS;
+    return ControllerPeriodicTasksConf.DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS;
   }
 
   public void setStatusCheckerFrequencyInSeconds(int statusCheckerFrequencyInSeconds) {
-    setProperty(STATUS_CHECKER_FREQUENCY_IN_SECONDS, Integer.toString(statusCheckerFrequencyInSeconds));
+    setProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_FREQUENCY_IN_SECONDS,
+        Integer.toString(statusCheckerFrequencyInSeconds));
   }
 
   public String getRealtimeSegmentRelocatorFrequency() {
-    if (containsKey(REALTIME_SEGMENT_RELOCATOR_FREQUENCY)) {
-      return (String) getProperty(REALTIME_SEGMENT_RELOCATOR_FREQUENCY);
+    if (containsKey(ControllerPeriodicTasksConf.REALTIME_SEGMENT_RELOCATOR_FREQUENCY)) {
+      return (String) getProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_RELOCATOR_FREQUENCY);
     }
-    return DEFAULT_REALTIME_SEGMENT_RELOCATOR_FREQUENCY;
+    return ControllerPeriodicTasksConf.DEFAULT_REALTIME_SEGMENT_RELOCATOR_FREQUENCY;
   }
 
   public void setRealtimeSegmentRelocatorFrequency(String relocatorFrequency) {
-    setProperty(REALTIME_SEGMENT_RELOCATOR_FREQUENCY, relocatorFrequency);
+    setProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_RELOCATOR_FREQUENCY, relocatorFrequency);
   }
 
   public int getStatusCheckerWaitForPushTimeInSeconds() {
-    if (containsKey(STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS)) {
-      return Integer.parseInt((String) getProperty(STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS));
+    if (containsKey(ControllerPeriodicTasksConf.STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS)) {
+      return Integer.parseInt(
+          (String) getProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS));
     }
-    return DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS;
+    return ControllerPeriodicTasksConf.DEFAULT_STATUS_CONTROLLER_WAIT_FOR_PUSH_TIME_IN_SECONDS;
   }
 
   public void setStatusCheckerWaitForPushTimeInSeconds(int statusCheckerWaitForPushTimeInSeconds) {
-    setProperty(STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS, Integer.toString(statusCheckerWaitForPushTimeInSeconds));
+    setProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_WAIT_FOR_PUSH_TIME_IN_SECONDS,
+        Integer.toString(statusCheckerWaitForPushTimeInSeconds));
   }
 
   public long getExternalViewOnlineToOfflineTimeout() {
@@ -417,11 +466,12 @@ public class ControllerConf extends PropertiesConfiguration {
   }
 
   public int getTaskManagerFrequencyInSeconds() {
-    return getInt(TASK_MANAGER_FREQUENCY_IN_SECONDS, DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS);
+    return getInt(ControllerPeriodicTasksConf.TASK_MANAGER_FREQUENCY_IN_SECONDS,
+        ControllerPeriodicTasksConf.DEFAULT_TASK_MANAGER_FREQUENCY_IN_SECONDS);
   }
 
   public void setTaskManagerFrequencyInSeconds(int frequencyInSeconds) {
-    setProperty(TASK_MANAGER_FREQUENCY_IN_SECONDS, Integer.toString(frequencyInSeconds));
+    setProperty(ControllerPeriodicTasksConf.TASK_MANAGER_FREQUENCY_IN_SECONDS, Integer.toString(frequencyInSeconds));
   }
 
   public int getDefaultTableMinReplicas() {
@@ -469,6 +519,7 @@ public class ControllerConf extends PropertiesConfiguration {
   }
 
   public int getSegmentLevelValidationIntervalInSeconds() {
-    return getInt(SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS, DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS);
+    return getInt(ControllerPeriodicTasksConf.SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS,
+        ControllerPeriodicTasksConf.DEFAULT_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS);
   }
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
index 2799b44..07ca195 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerStarter.java
@@ -41,7 +41,9 @@ import com.linkedin.pinot.controller.helix.core.realtime.PinotRealtimeSegmentMan
 import com.linkedin.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategyFactory;
 import com.linkedin.pinot.controller.helix.core.relocation.RealtimeSegmentRelocator;
 import com.linkedin.pinot.controller.helix.core.retention.RetentionManager;
-import com.linkedin.pinot.controller.validation.ValidationManager;
+import com.linkedin.pinot.controller.validation.BrokerResourceValidationManager;
+import com.linkedin.pinot.controller.validation.OfflineSegmentIntervalChecker;
+import com.linkedin.pinot.controller.validation.RealtimeSegmentValidationManager;
 import com.linkedin.pinot.core.crypt.PinotCrypterFactory;
 import com.linkedin.pinot.core.periodictask.PeriodicTask;
 import com.linkedin.pinot.filesystem.PinotFSFactory;
@@ -86,7 +88,9 @@ public class ControllerStarter {
   private final ControllerPeriodicTaskScheduler _controllerPeriodicTaskScheduler;
 
   // Can only be constructed after resource manager getting started
-  private ValidationManager _validationManager;
+  private OfflineSegmentIntervalChecker _offlineSegmentIntervalChecker;
+  private RealtimeSegmentValidationManager _realtimeSegmentValidationManager;
+  private BrokerResourceValidationManager _brokerResourceValidationManager;
   private RealtimeSegmentRelocator _realtimeSegmentRelocator;
   private PinotHelixTaskResourceManager _helixTaskResourceManager;
   private PinotTaskManager _taskManager;
@@ -95,8 +99,7 @@ public class ControllerStarter {
     _config = conf;
     _adminApp = new ControllerAdminApiApplication(_config.getQueryConsoleWebappPath(), _config.getQueryConsoleUseHttps());
     _helixResourceManager = new PinotHelixResourceManager(_config);
-    _retentionManager = new RetentionManager(_helixResourceManager, _config.getRetentionControllerFrequencyInSeconds(),
-        _config.getDeletedSegmentsRetentionInDays());
+    _retentionManager = new RetentionManager(_helixResourceManager, _config);
     _metricsRegistry = new MetricsRegistry();
     _controllerMetrics = new ControllerMetrics(_metricsRegistry);
     _realtimeSegmentsManager = new PinotRealtimeSegmentManager(_helixResourceManager);
@@ -111,8 +114,16 @@ public class ControllerStarter {
     return _helixResourceManager;
   }
 
-  public ValidationManager getValidationManager() {
-    return _validationManager;
+  public OfflineSegmentIntervalChecker getOfflineSegmentIntervalChecker() {
+    return _offlineSegmentIntervalChecker;
+  }
+
+  public RealtimeSegmentValidationManager getRealtimeSegmentValidationManager() {
+    return _realtimeSegmentValidationManager;
+  }
+
+  public BrokerResourceValidationManager getBrokerResourceValidationManager() {
+    return _brokerResourceValidationManager;
   }
 
   public PinotHelixTaskResourceManager getHelixTaskResourceManager() {
@@ -181,10 +192,18 @@ public class ControllerStarter {
     _taskManager = new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, _config, _controllerMetrics);
     periodicTasks.add(_taskManager);
     periodicTasks.add(_retentionManager);
-    _validationManager =
-        new ValidationManager(_config, _helixResourceManager, PinotLLCRealtimeSegmentManager.getInstance(),
+    _offlineSegmentIntervalChecker =
+        new OfflineSegmentIntervalChecker(_config, _helixResourceManager, PinotLLCRealtimeSegmentManager.getInstance(),
+            new ValidationMetrics(_metricsRegistry));
+    _realtimeSegmentValidationManager =
+        new RealtimeSegmentValidationManager(_config, _helixResourceManager, PinotLLCRealtimeSegmentManager.getInstance(),
             new ValidationMetrics(_metricsRegistry));
-    periodicTasks.add(_validationManager);
+    _brokerResourceValidationManager =
+        new BrokerResourceValidationManager(_config, _helixResourceManager);
+
+    periodicTasks.add(_offlineSegmentIntervalChecker);
+    periodicTasks.add(_realtimeSegmentValidationManager);
+    periodicTasks.add(_brokerResourceValidationManager);
     periodicTasks.add(_segmentStatusChecker);
     periodicTasks.add(_realtimeSegmentRelocator);
 
@@ -351,7 +370,9 @@ public class ControllerStarter {
     conf.setControllerVipHost("localhost");
     conf.setControllerVipProtocol("http");
     conf.setRetentionControllerFrequencyInSeconds(3600 * 6);
-    conf.setValidationControllerFrequencyInSeconds(3600);
+    conf.setOfflineSegmentIntervalCheckerFrequencyInSeconds(3600);
+    conf.setRealtimeSegmentValidationFrequencyInSeconds(3600);
+    conf.setBrokerResourceValidationFrequencyInSeconds(3600);
     conf.setStatusCheckerFrequencyInSeconds(5 * 60);
     conf.setRealtimeSegmentRelocatorFrequency("1h");
     conf.setStatusCheckerWaitForPushTimeInSeconds(10 * 60);
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 9860ec1..e708abc 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -818,8 +818,8 @@ public class PinotLLCRealtimeSegmentManager {
   /**
    * An instance is reporting that it has stopped consuming a topic due to some error.
    * Mark the state of the segment to be OFFLINE in idealstate.
-   * When all replicas of this segment are marked offline, the ValidationManager, in its next
-   * run, will auto-create a new segment with the appropriate offset.
+   * When all replicas of this segment are marked offline, the {@link com.linkedin.pinot.controller.validation.RealtimeSegmentValidationManager},
+   * in its next run, will auto-create a new segment with the appropriate offset.
    */
   public void segmentStoppedConsuming(final LLCSegmentName segmentName, final String instance) {
     String rawTableName = segmentName.getTableName();
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
index ec15f28..d0e0b2e 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
@@ -26,6 +26,7 @@ import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import com.linkedin.pinot.common.utils.CommonConstants;
 import com.linkedin.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
 import com.linkedin.pinot.common.utils.SegmentName;
+import com.linkedin.pinot.controller.ControllerConf;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import com.linkedin.pinot.controller.helix.core.retention.strategy.RetentionStrategy;
@@ -52,10 +53,9 @@ public class RetentionManager extends ControllerPeriodicTask {
 
   private final int _deletedSegmentsRetentionInDays;
 
-  public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager, int runFrequencyInSeconds,
-      int deletedSegmentsRetentionInDays) {
-    super("RetentionManager", runFrequencyInSeconds, pinotHelixResourceManager);
-    _deletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays;
+  public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config) {
+    super("RetentionManager", config.getRetentionControllerFrequencyInSeconds(), pinotHelixResourceManager);
+    _deletedSegmentsRetentionInDays = config.getDeletedSegmentsRetentionInDays();
 
     LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}, deletedSegmentsRetentionInDays: {}",
         getIntervalInSeconds(), _deletedSegmentsRetentionInDays);
@@ -148,7 +148,7 @@ public class RetentionManager extends ControllerPeriodicTask {
         // In progress segment, only check LLC segment
         if (SegmentName.isLowLevelConsumerSegmentName(segmentName)) {
           // Delete old LLC segment that hangs around. Do not delete segment that are current since there may be a race
-          // with ValidationManager trying to auto-create the LLC segment
+          // with RealtimeSegmentValidationManager trying to auto-create the LLC segment
           if (shouldDeleteInProgressLLCSegment(segmentName, idealState, realtimeSegmentZKMetadata)) {
             segmentsToDelete.add(segmentName);
           }
@@ -172,7 +172,7 @@ public class RetentionManager extends ControllerPeriodicTask {
       return false;
     }
     // delete a segment only if it is old enough (5 days) or else,
-    // 1. latest segment could get deleted in the middle of repair by ValidationManager
+    // 1. latest segment could get deleted in the middle of repair by RealtimeSegmentValidationManager
     // 2. for a brand new segment, if this code kicks in after new metadata is created but ideal state entry is not yet created (between step 2 and 3),
     // the latest segment metadata could get marked for deletion
     if (System.currentTimeMillis() - realtimeSegmentZKMetadata.getCreationTime()
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/BrokerResourceValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/BrokerResourceValidationManager.java
new file mode 100644
index 0000000..e3264c4
--- /dev/null
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/BrokerResourceValidationManager.java
@@ -0,0 +1,80 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.controller.validation;
+
+import com.linkedin.pinot.common.config.TableConfig;
+import com.linkedin.pinot.controller.ControllerConf;
+import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
+import com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import java.util.List;
+import java.util.Set;
+import org.apache.helix.model.InstanceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Rebuilds the broker resource if the instance set has changed
+ */
+public class BrokerResourceValidationManager extends ControllerPeriodicTask {
+  private static final Logger LOGGER = LoggerFactory.getLogger(BrokerResourceValidationManager.class);
+
+  private List<InstanceConfig> _instanceConfigs;
+
+  public BrokerResourceValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager) {
+    super("BrokerResourceValidationManager", config.getBrokerResourceValidationFrequencyInSeconds(),
+        pinotHelixResourceManager);
+  }
+
+  @Override
+  protected void preprocess() {
+    // Cache instance configs to reduce ZK access
+    _instanceConfigs = _pinotHelixResourceManager.getAllHelixInstanceConfigs();
+  }
+
+  @Override
+  protected void processTable(String tableNameWithType) {
+    try {
+      TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}, skipping broker resource validation", tableNameWithType);
+        return;
+      }
+
+      // Rebuild broker resource
+      Set<String> brokerInstances = _pinotHelixResourceManager.getAllInstancesForBrokerTenant(_instanceConfigs,
+          tableConfig.getTenantConfig().getBroker());
+      _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType, brokerInstances);
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while validating broker resource for table: {}", tableNameWithType, e);
+    }
+  }
+
+
+  @Override
+  protected void postprocess() {
+
+  }
+
+  @Override
+  protected void initTask() {
+
+  }
+
+  @Override
+  public void stopTask() {
+  }
+}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java
similarity index 65%
rename from pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
rename to pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index c5528e7..e952437 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -19,29 +19,20 @@
 package com.linkedin.pinot.controller.validation;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.linkedin.pinot.common.config.SegmentsValidationAndRetentionConfig;
 import com.linkedin.pinot.common.config.TableConfig;
 import com.linkedin.pinot.common.config.TableNameBuilder;
 import com.linkedin.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
-import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import com.linkedin.pinot.common.metrics.ValidationMetrics;
 import com.linkedin.pinot.common.utils.CommonConstants;
-import com.linkedin.pinot.common.utils.HLCSegmentName;
-import com.linkedin.pinot.common.utils.SegmentName;
 import com.linkedin.pinot.common.utils.time.TimeUtils;
 import com.linkedin.pinot.controller.ControllerConf;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import com.linkedin.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
-import com.linkedin.pinot.core.realtime.stream.StreamConfig;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.helix.model.InstanceConfig;
 import org.joda.time.Duration;
 import org.joda.time.Interval;
 import org.slf4j.Logger;
@@ -52,53 +43,26 @@ import org.slf4j.LoggerFactory;
  * Manages the segment validation metrics, to ensure that all offline segments are contiguous (no missing segments) and
  * that the offline push delay isn't too high.
  */
-public class ValidationManager extends ControllerPeriodicTask {
-  private static final Logger LOGGER = LoggerFactory.getLogger(ValidationManager.class);
+public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask {
+  private static final Logger LOGGER = LoggerFactory.getLogger(OfflineSegmentIntervalChecker.class);
 
-  private final int _segmentLevelValidationIntervalInSeconds;
-  private final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
-  private final ValidationMetrics _validationMetrics;
+  protected final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
+  protected final ValidationMetrics _validationMetrics;
 
-  private long _lastSegmentLevelValidationTimeMs = 0L;
-  private boolean _runSegmentLevelValidation;
-  private List<InstanceConfig> _instanceConfigs;
-
-  public ValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
+  public OfflineSegmentIntervalChecker(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
       PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, ValidationMetrics validationMetrics) {
-    super("ValidationManager", config.getValidationControllerFrequencyInSeconds(), pinotHelixResourceManager);
-    _segmentLevelValidationIntervalInSeconds = config.getSegmentLevelValidationIntervalInSeconds();
-    Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0);
+    super("OfflineSegmentIntervalChecker", config.getOfflineSegmentIntervalCheckerFrequencyInSeconds(),
+        pinotHelixResourceManager);
     _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
     _validationMetrics = validationMetrics;
   }
 
   @Override
   protected void preprocess() {
-    // Run segment level validation using a separate interval
-    _runSegmentLevelValidation = false;
-    long currentTimeMs = System.currentTimeMillis();
-    if (TimeUnit.MILLISECONDS.toSeconds(currentTimeMs - _lastSegmentLevelValidationTimeMs)
-        >= _segmentLevelValidationIntervalInSeconds) {
-      LOGGER.info("Run segment-level validation");
-      _runSegmentLevelValidation = true;
-      _lastSegmentLevelValidationTimeMs = currentTimeMs;
-    }
-
-    // Cache instance configs to reduce ZK access
-    _instanceConfigs = _pinotHelixResourceManager.getAllHelixInstanceConfigs();
   }
 
   @Override
   protected void processTable(String tableNameWithType) {
-    runValidation(tableNameWithType);
-  }
-
-  @Override
-  protected void postprocess() {
-
-  }
-
-  private void runValidation(String tableNameWithType) {
     try {
       TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
       if (tableConfig == null) {
@@ -106,29 +70,12 @@ public class ValidationManager extends ControllerPeriodicTask {
         return;
       }
 
-      // Rebuild broker resource
-      Set<String> brokerInstances = _pinotHelixResourceManager.getAllInstancesForBrokerTenant(_instanceConfigs,
-          tableConfig.getTenantConfig().getBroker());
-      _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType, brokerInstances);
-
-      // Perform validation based on the table type
       CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
       if (tableType == CommonConstants.Helix.TableType.OFFLINE) {
-        if (_runSegmentLevelValidation) {
-          validateOfflineSegmentPush(tableConfig);
-        }
-      } else {
-        if (_runSegmentLevelValidation) {
-          updateRealtimeDocumentCount(tableConfig);
-        }
-        Map<String, String> streamConfigMap = tableConfig.getIndexingConfig().getStreamConfigs();
-        StreamConfig streamConfig = new StreamConfig(streamConfigMap);
-        if (streamConfig.hasLowLevelConsumerType()) {
-          _llcRealtimeSegmentManager.validateLLCSegments(tableConfig);
-        }
+        validateOfflineSegmentPush(tableConfig);
       }
     } catch (Exception e) {
-      LOGGER.warn("Caught exception while validating table: {}", tableNameWithType, e);
+      LOGGER.warn("Caught exception while checking offline segment intervals for table: {}", tableNameWithType, e);
     }
   }
 
@@ -264,50 +211,9 @@ public class ValidationManager extends ControllerPeriodicTask {
     return numTotalDocs;
   }
 
-  private void updateRealtimeDocumentCount(TableConfig tableConfig) {
-    String realtimeTableName = tableConfig.getTableName();
-    List<RealtimeSegmentZKMetadata> metadataList =
-        _pinotHelixResourceManager.getRealtimeSegmentMetadata(realtimeTableName);
-    boolean countHLCSegments = true;  // false if this table has ONLY LLC segments (i.e. fully migrated)
-    StreamConfig streamConfig = new StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs());
-    if (streamConfig.hasLowLevelConsumerType() && !streamConfig.hasHighLevelConsumerType()) {
-      countHLCSegments = false;
-    }
-    // Update the gauge to contain the total document count in the segments
-    _validationMetrics.updateTotalDocumentCountGauge(tableConfig.getTableName(),
-        computeRealtimeTotalDocumentInSegments(metadataList, countHLCSegments));
-  }
-
-  @VisibleForTesting
-  static long computeRealtimeTotalDocumentInSegments(List<RealtimeSegmentZKMetadata> realtimeSegmentZKMetadataList,
-      boolean countHLCSegments) {
-    long numTotalDocs = 0;
-
-    String groupId = "";
-    for (RealtimeSegmentZKMetadata realtimeSegmentZKMetadata : realtimeSegmentZKMetadataList) {
-      String segmentName = realtimeSegmentZKMetadata.getSegmentName();
-      if (SegmentName.isHighLevelConsumerSegmentName(segmentName)) {
-        if (countHLCSegments) {
-          HLCSegmentName hlcSegmentName = new HLCSegmentName(segmentName);
-          String segmentGroupIdName = hlcSegmentName.getGroupId();
-
-          if (groupId.isEmpty()) {
-            groupId = segmentGroupIdName;
-          }
-          // Discard all segments with different groupids as they are replicas
-          if (groupId.equals(segmentGroupIdName) && realtimeSegmentZKMetadata.getTotalRawDocs() >= 0) {
-            numTotalDocs += realtimeSegmentZKMetadata.getTotalRawDocs();
-          }
-        }
-      } else {
-        // Low level segments
-        if (!countHLCSegments) {
-          numTotalDocs += realtimeSegmentZKMetadata.getTotalRawDocs();
-        }
-      }
-    }
+  @Override
+  protected void postprocess() {
 
-    return numTotalDocs;
   }
 
   @Override
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
new file mode 100644
index 0000000..71e50d2
--- /dev/null
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -0,0 +1,162 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.controller.validation;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.linkedin.pinot.common.config.TableConfig;
+import com.linkedin.pinot.common.config.TableNameBuilder;
+import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import com.linkedin.pinot.common.metrics.ValidationMetrics;
+import com.linkedin.pinot.common.utils.CommonConstants;
+import com.linkedin.pinot.common.utils.HLCSegmentName;
+import com.linkedin.pinot.common.utils.SegmentName;
+import com.linkedin.pinot.controller.ControllerConf;
+import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
+import com.linkedin.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import com.linkedin.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import com.linkedin.pinot.core.realtime.stream.StreamConfig;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Validates realtime ideal states and segment metadata, fixing any partitions which have stopped consuming
+ */
+public class RealtimeSegmentValidationManager extends ControllerPeriodicTask {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeSegmentValidationManager.class);
+
+  protected final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
+  protected final ValidationMetrics _validationMetrics;
+
+  private final int _segmentLevelValidationIntervalInSeconds;
+  private long _lastUpdateRealtimeDocumentCountTimeMs = 0L;
+  private boolean _updateRealtimeDocumentCount;
+
+  public RealtimeSegmentValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
+      PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, ValidationMetrics validationMetrics) {
+    super("RealtimeSegmentValidationManager", config.getRealtimeSegmentValidationFrequencyInSeconds(),
+        pinotHelixResourceManager);
+    _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
+    _validationMetrics = validationMetrics;
+
+    _segmentLevelValidationIntervalInSeconds = config.getSegmentLevelValidationIntervalInSeconds();
+    Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0);
+  }
+
+  @Override
+  protected void preprocess() {
+    // Update realtime document counts only if certain time has passed after previous run
+    _updateRealtimeDocumentCount = false;
+    long currentTimeMs = System.currentTimeMillis();
+    if (TimeUnit.MILLISECONDS.toSeconds(currentTimeMs - _lastUpdateRealtimeDocumentCountTimeMs)
+        >= _segmentLevelValidationIntervalInSeconds) {
+      LOGGER.info("Run segment-level validation");
+      _updateRealtimeDocumentCount = true;
+      _lastUpdateRealtimeDocumentCountTimeMs = currentTimeMs;
+    }
+  }
+
+  @Override
+  protected void processTable(String tableNameWithType) {
+    try {
+      TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}, skipping validation", tableNameWithType);
+        return;
+      }
+
+      CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == CommonConstants.Helix.TableType.REALTIME) {
+        if (_updateRealtimeDocumentCount) {
+          updateRealtimeDocumentCount(tableConfig);
+        }
+      }
+      Map<String, String> streamConfigMap = tableConfig.getIndexingConfig().getStreamConfigs();
+      StreamConfig streamConfig = new StreamConfig(streamConfigMap);
+      if (streamConfig.hasLowLevelConsumerType()) {
+        _llcRealtimeSegmentManager.validateLLCSegments(tableConfig);
+      }
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while validating realtime table: {}", tableNameWithType, e);
+    }
+  }
+
+  private void updateRealtimeDocumentCount(TableConfig tableConfig) {
+    String realtimeTableName = tableConfig.getTableName();
+    List<RealtimeSegmentZKMetadata> metadataList =
+        _pinotHelixResourceManager.getRealtimeSegmentMetadata(realtimeTableName);
+    boolean countHLCSegments = true;  // false if this table has ONLY LLC segments (i.e. fully migrated)
+    StreamConfig streamConfig = new StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs());
+    if (streamConfig.hasLowLevelConsumerType() && !streamConfig.hasHighLevelConsumerType()) {
+      countHLCSegments = false;
+    }
+    // Update the gauge to contain the total document count in the segments
+    _validationMetrics.updateTotalDocumentCountGauge(tableConfig.getTableName(),
+        computeRealtimeTotalDocumentInSegments(metadataList, countHLCSegments));
+  }
+
+  @VisibleForTesting
+  static long computeRealtimeTotalDocumentInSegments(List<RealtimeSegmentZKMetadata> realtimeSegmentZKMetadataList,
+      boolean countHLCSegments) {
+    long numTotalDocs = 0;
+
+    String groupId = "";
+    for (RealtimeSegmentZKMetadata realtimeSegmentZKMetadata : realtimeSegmentZKMetadataList) {
+      String segmentName = realtimeSegmentZKMetadata.getSegmentName();
+      if (SegmentName.isHighLevelConsumerSegmentName(segmentName)) {
+        if (countHLCSegments) {
+          HLCSegmentName hlcSegmentName = new HLCSegmentName(segmentName);
+          String segmentGroupIdName = hlcSegmentName.getGroupId();
+
+          if (groupId.isEmpty()) {
+            groupId = segmentGroupIdName;
+          }
+          // Discard all segments with different groupids as they are replicas
+          if (groupId.equals(segmentGroupIdName) && realtimeSegmentZKMetadata.getTotalRawDocs() >= 0) {
+            numTotalDocs += realtimeSegmentZKMetadata.getTotalRawDocs();
+          }
+        }
+      } else {
+        // Low level segments
+        if (!countHLCSegments) {
+          numTotalDocs += realtimeSegmentZKMetadata.getTotalRawDocs();
+        }
+      }
+    }
+
+    return numTotalDocs;
+  }
+
+  @Override
+  protected void postprocess() {
+
+  }
+
+  @Override
+  protected void initTask() {
+
+  }
+
+  @Override
+  public void stopTask() {
+    LOGGER.info("Unregister all the validation metrics.");
+    _validationMetrics.unregisterAllMetrics();
+  }
+}
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
index 3e60350..d2fe9b3 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -26,6 +26,7 @@ import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import com.linkedin.pinot.common.segment.SegmentMetadata;
 import com.linkedin.pinot.common.utils.CommonConstants;
 import com.linkedin.pinot.common.utils.LLCSegmentName;
+import com.linkedin.pinot.controller.ControllerConf;
 import com.linkedin.pinot.controller.helix.core.PinotHelixResourceManager;
 import com.linkedin.pinot.controller.helix.core.PinotTableIdealStateBuilder;
 import com.linkedin.pinot.controller.helix.core.SegmentDeletionManager;
@@ -87,7 +88,10 @@ public class RetentionManagerTest {
     when(pinotHelixResourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
     when(pinotHelixResourceManager.getOfflineSegmentMetadata(OFFLINE_TABLE_NAME)).thenReturn(metadataList);
 
-    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, 0, 0);
+    ControllerConf conf = new ControllerConf();
+    conf.setRetentionControllerFrequencyInSeconds(0);
+    conf.setDeletedSegmentsRetentionInDays(0);
+    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, conf);
     retentionManager.init();
     retentionManager.run();
 
@@ -204,7 +208,10 @@ public class RetentionManagerTest {
         setupSegmentMetadata(tableConfig, now, initialNumSegments, removedSegments);
     setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager);
 
-    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, 0, 0);
+    ControllerConf conf = new ControllerConf();
+    conf.setRetentionControllerFrequencyInSeconds(0);
+    conf.setDeletedSegmentsRetentionInDays(0);
+    RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, conf);
     retentionManager.init();
     retentionManager.run();
 
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java
index 0526afb..f69657b 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/validation/ValidationManagerTest.java
@@ -52,7 +52,7 @@ import static org.testng.Assert.*;
 
 
 /**
- * Tests for the ValidationManager.
+ * Tests for the ValidationManagers.
  */
 public class ValidationManagerTest {
   private String HELIX_CLUSTER_NAME = "TestValidationManager";
@@ -178,7 +178,7 @@ public class ValidationManagerTest {
     segmentZKMetadataList.add(
         SegmentMetadataMockUtils.mockRealtimeSegmentZKMetadata(TEST_TABLE_NAME, segmentName4, 20));
 
-    assertEquals(ValidationManager.computeRealtimeTotalDocumentInSegments(segmentZKMetadataList, true), 60);
+    assertEquals(RealtimeSegmentValidationManager.computeRealtimeTotalDocumentInSegments(segmentZKMetadataList, true), 60);
 
     // Now add some low level segment names
     String segmentName5 = new LLCSegmentName(TEST_TABLE_NAME, 1, 0, 1000).getSegmentName();
@@ -188,7 +188,7 @@ public class ValidationManagerTest {
     segmentZKMetadataList.add(SegmentMetadataMockUtils.mockRealtimeSegmentZKMetadata(TEST_TABLE_NAME, segmentName6, 5));
 
     // Only the LLC segments should get counted.
-    assertEquals(ValidationManager.computeRealtimeTotalDocumentInSegments(segmentZKMetadataList, false), 15);
+    assertEquals(RealtimeSegmentValidationManager.computeRealtimeTotalDocumentInSegments(segmentZKMetadataList, false), 15);
   }
 
   @AfterClass
@@ -210,18 +210,18 @@ public class ValidationManagerTest {
     jan1st2nd3rd.add(jan1st);
     jan1st2nd3rd.add(jan2nd);
     jan1st2nd3rd.add(jan3rd);
-    assertEquals(ValidationManager.computeNumMissingSegments(jan1st2nd3rd, Duration.standardDays(1)), 0);
+    assertEquals(OfflineSegmentIntervalChecker.computeNumMissingSegments(jan1st2nd3rd, Duration.standardDays(1)), 0);
 
     ArrayList<Interval> jan1st2nd3rd5th = new ArrayList<>(jan1st2nd3rd);
     jan1st2nd3rd5th.add(jan5th);
-    assertEquals(ValidationManager.computeNumMissingSegments(jan1st2nd3rd5th, Duration.standardDays(1)), 1);
+    assertEquals(OfflineSegmentIntervalChecker.computeNumMissingSegments(jan1st2nd3rd5th, Duration.standardDays(1)), 1);
 
     // Should also work if the intervals are in random order
     ArrayList<Interval> jan5th2nd1st = new ArrayList<>();
     jan5th2nd1st.add(jan5th);
     jan5th2nd1st.add(jan2nd);
     jan5th2nd1st.add(jan1st);
-    assertEquals(ValidationManager.computeNumMissingSegments(jan5th2nd1st, Duration.standardDays(1)), 2);
+    assertEquals(OfflineSegmentIntervalChecker.computeNumMissingSegments(jan5th2nd1st, Duration.standardDays(1)), 2);
 
     // Should also work if the intervals are of different sizes
     Interval jan1stAnd2nd = new Interval(new DateTime(2015, 1, 1, 0, 0, 0), new DateTime(2015, 1, 2, 23, 59, 59));
@@ -229,6 +229,6 @@ public class ValidationManagerTest {
     jan1st2nd4th5th.add(jan1stAnd2nd);
     jan1st2nd4th5th.add(jan4th);
     jan1st2nd4th5th.add(jan5th);
-    assertEquals(ValidationManager.computeNumMissingSegments(jan1st2nd4th5th, Duration.standardDays(1)), 1);
+    assertEquals(OfflineSegmentIntervalChecker.computeNumMissingSegments(jan1st2nd4th5th, Duration.standardDays(1)), 1);
   }
 }
diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SegmentCompletionIntegrationTests.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SegmentCompletionIntegrationTests.java
index 28eb463..5ef1278 100644
--- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SegmentCompletionIntegrationTests.java
+++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/SegmentCompletionIntegrationTests.java
@@ -28,7 +28,7 @@ import com.linkedin.pinot.common.utils.LLCSegmentName;
 import com.linkedin.pinot.common.utils.NetUtil;
 import com.linkedin.pinot.common.utils.ZkStarter;
 import com.linkedin.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
-import com.linkedin.pinot.controller.validation.ValidationManager;
+import com.linkedin.pinot.controller.validation.RealtimeSegmentValidationManager;
 import com.linkedin.pinot.server.realtime.ControllerLeaderLocator;
 import com.linkedin.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
 import com.linkedin.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory;
@@ -155,7 +155,7 @@ public class SegmentCompletionIntegrationTests extends LLCRealtimeClusterIntegra
     final String oldSegment = _currentSegment;
 
     // Now call the validation manager, and the segment should fix itself
-    ValidationManager validationManager = _controllerStarter.getValidationManager();
+    RealtimeSegmentValidationManager validationManager = _controllerStarter.getRealtimeSegmentValidationManager();
     validationManager.init();
     validationManager.run();
 
diff --git a/pinot-tools/src/main/java/com/linkedin/pinot/tools/admin/command/StartControllerCommand.java b/pinot-tools/src/main/java/com/linkedin/pinot/tools/admin/command/StartControllerCommand.java
index 49c56b1..aa82182 100644
--- a/pinot-tools/src/main/java/com/linkedin/pinot/tools/admin/command/StartControllerCommand.java
+++ b/pinot-tools/src/main/java/com/linkedin/pinot/tools/admin/command/StartControllerCommand.java
@@ -150,7 +150,9 @@ public class StartControllerCommand extends AbstractBaseAdminCommand implements
         conf.setTenantIsolationEnabled(_tenantIsolation);
 
         conf.setRetentionControllerFrequencyInSeconds(3600 * 6);
-        conf.setValidationControllerFrequencyInSeconds(3600);
+        conf.setOfflineSegmentIntervalCheckerFrequencyInSeconds(3600);
+        conf.setRealtimeSegmentValidationFrequencyInSeconds(3600);
+        conf.setBrokerResourceValidationFrequencyInSeconds(3600);
       }
 
       LOGGER.info("Executing command: " + toString());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 05/08: Use controller validation frequency as default values for new periodic validation tasks

Posted by ne...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch split_vm_tasks_2
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 75259146d0c219e2372abdb6d18fe759cc1a49e4
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Mon Jan 7 18:31:25 2019 -0800

    Use controller validation frequency as default values for new periodic validation tasks
---
 .../linkedin/pinot/controller/ControllerConf.java  | 45 +++++++++++-----------
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  6 +--
 .../RealtimeSegmentValidationManager.java          |  2 +-
 .../PinotLLCRealtimeSegmentManagerTest.java        | 26 ++++++-------
 4 files changed, 40 insertions(+), 39 deletions(-)

diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
index 9f84691..c6ff82e 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
@@ -57,7 +57,9 @@ public class ControllerConf extends PropertiesConfiguration {
 
   public static class ControllerPeriodicTasksConf {
     private static final String RETENTION_MANAGER_FREQUENCY_IN_SECONDS = "controller.retention.frequencyInSeconds";
-    private static final String VALIDATION_MANAGER_FREQUENCY_IN_SECONDS = "controller.validation.frequencyInSeconds";
+    @Deprecated // The ValidationManager has been split up into 3 separate tasks, each having their own frequency config settings
+    private static final String DEPRECATED_VALIDATION_MANAGER_FREQUENCY_IN_SECONDS =
+        "controller.validation.frequencyInSeconds";
     private static final String OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS =
         "controller.offline.segment.interval.checker.frequencyInSeconds";
     private static final String REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS =
@@ -76,7 +78,8 @@ public class ControllerConf extends PropertiesConfiguration {
         "controller.segment.level.validation.intervalInSeconds";
 
     private static final int DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours.
-    private static final int DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
+    @Deprecated // The ValidationManager has been split up into 3 separate tasks, each having their own frequency config settings
+    private static final int DEPRECATED_DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
     private static final int DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours.
     private static final int DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
     private static final int DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
@@ -353,34 +356,26 @@ public class ControllerConf extends PropertiesConfiguration {
         Integer.toString(retentionFrequencyInSeconds));
   }
 
-  /**
-   * Deprecated. The validation manager has been split into 3 separate tasks, each having their own frequency config
-   * @return
-   */
-  @Deprecated
-  public int getValidationControllerFrequencyInSeconds() {
-    if (containsKey(ControllerPeriodicTasksConf.VALIDATION_MANAGER_FREQUENCY_IN_SECONDS)) {
+  private int getValidationControllerFrequencyInSeconds() {
+    if (containsKey(ControllerPeriodicTasksConf.DEPRECATED_VALIDATION_MANAGER_FREQUENCY_IN_SECONDS)) {
       return Integer.parseInt(
-          (String) getProperty(ControllerPeriodicTasksConf.VALIDATION_MANAGER_FREQUENCY_IN_SECONDS));
+          (String) getProperty(ControllerPeriodicTasksConf.DEPRECATED_VALIDATION_MANAGER_FREQUENCY_IN_SECONDS));
     }
-    return ControllerPeriodicTasksConf.DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS;
+    return ControllerPeriodicTasksConf.DEPRECATED_DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS;
   }
 
   /**
-   * Deprecated. The validation manager has been split into 3 separate tasks, each having their own frequency config
+   * Returns the config value for controller.offline.segment.interval.checker.frequencyInSeconds if it exists.
+   * If it doesn't exist, returns the segment level validation interval. This is done in order to retain the current behavior,
+   * wherein the offline validation tasks were done at segment level validation interval frequency
    * @return
    */
-  public void setValidationControllerFrequencyInSeconds(int validationFrequencyInSeconds) {
-    setProperty(ControllerPeriodicTasksConf.VALIDATION_MANAGER_FREQUENCY_IN_SECONDS,
-        Integer.toString(validationFrequencyInSeconds));
-  }
-
   public int getOfflineSegmentIntervalCheckerFrequencyInSeconds() {
     if (containsKey(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS)) {
       return Integer.parseInt(
           (String) getProperty(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS));
     }
-    return ControllerPeriodicTasksConf.DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS;
+    return getSegmentLevelValidationIntervalInSeconds();
   }
 
   public void setOfflineSegmentIntervalCheckerFrequencyInSeconds(int validationFrequencyInSeconds) {
@@ -388,13 +383,18 @@ public class ControllerConf extends PropertiesConfiguration {
         Integer.toString(validationFrequencyInSeconds));
   }
 
+  /**
+   * Returns the config value for controller.realtime.segment.validation.frequencyInSeconds if it exists.
+   * If it doesn't exist, returns the validation controller frequency. This is done in order to retain the current behavior,
+   * wherein the realtime validation tasks were done at validation controller frequency
+   * @return
+   */
   public int getRealtimeSegmentValidationFrequencyInSeconds() {
-
     if (containsKey(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS)) {
       return Integer.parseInt(
           (String) getProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS));
     }
-    return ControllerPeriodicTasksConf.DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS;
+    return getValidationControllerFrequencyInSeconds();
   }
 
   public void setRealtimeSegmentValidationFrequencyInSeconds(int validationFrequencyInSeconds) {
@@ -403,8 +403,9 @@ public class ControllerConf extends PropertiesConfiguration {
   }
 
   /**
-   * Return broker resource validation frequency if present, else return the validation manager frequency
-   * This is so that we can rollout with no config changes to the frequency of this task
+   * Returns the config value for  controller.broker.resource.validation.frequencyInSeconds if it exists.
+   * If it doesn't exist, returns the validation controller frequency. This is done in order to retain the current behavior,
+   * wherin the broker resource validation tasks were done at validation controller frequency
    * @return
    */
   public int getBrokerResourceValidationFrequencyInSeconds() {
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index e708abc..a77750e 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -950,7 +950,7 @@ public class PinotLLCRealtimeSegmentManager {
    *
    * TODO: We need to find a place to detect and update a gauge for nonConsumingPartitionsCount for a table, and reset it to 0 at the end of validateLLC
    */
-  public void validateLLCSegments(final TableConfig tableConfig) {
+  public void ensureAllPartitionsConsuming(final TableConfig tableConfig) {
     final String tableNameWithType = tableConfig.getTableName();
     final StreamConfig streamConfig = new StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs());
     final int partitionCount = getPartitionCount(streamConfig);
@@ -958,7 +958,7 @@ public class PinotLLCRealtimeSegmentManager {
       @Nullable
       @Override
       public IdealState apply(@Nullable IdealState idealState) {
-        return validateLLCSegments(tableConfig, idealState, partitionCount);
+        return ensureAllPartitionsConsuming(tableConfig, idealState, partitionCount);
       }
     }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f), true);
   }
@@ -1078,7 +1078,7 @@ public class PinotLLCRealtimeSegmentManager {
    * TODO: split this method into multiple smaller methods
    */
   @VisibleForTesting
-  protected IdealState validateLLCSegments(final TableConfig tableConfig, IdealState idealState,
+  protected IdealState ensureAllPartitionsConsuming(final TableConfig tableConfig, IdealState idealState,
       final int partitionCount) {
     final String tableNameWithType = tableConfig.getTableName();
     final StreamConfig streamConfig = new StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs());
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 36d7b27..77ed13a 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -92,7 +92,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask {
         Map<String, String> streamConfigMap = tableConfig.getIndexingConfig().getStreamConfigs();
         StreamConfig streamConfig = new StreamConfig(streamConfigMap);
         if (streamConfig.hasLowLevelConsumerType()) {
-          _llcRealtimeSegmentManager.validateLLCSegments(tableConfig);
+          _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig);
         }
       }
     } catch (Exception e) {
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 36f3386..fd417a5 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -369,7 +369,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
       oldMetadataMap.put(entry.getKey(), new LLCRealtimeSegmentZKMetadata(entry.getValue().toZNRecord()));
     }
     segmentManager._partitionAssignmentGenerator.setConsumingInstances(instances);
-    IdealState updatedIdealState = segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+    IdealState updatedIdealState = segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
     Map<String, Map<String, String>> updatedMapFields = updatedIdealState.getRecord().getMapFields();
     Map<String, LLCRealtimeSegmentZKMetadata> updatedMetadataMap = segmentManager._metadataMap;
 
@@ -525,13 +525,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
           Map<String, Map<String, String>> oldMapFields = idealStateCopy.getRecord().getMapFields();
 
           if (tooSoonToCorrect) {
-            segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+            segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
             // validate that all entries in oldMapFields are unchanged in new ideal state
             verifyNoChangeToOldEntries(oldMapFields, idealState);
             segmentManager.tooSoonToCorrect = false;
           }
 
-          segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+          segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
 
           // verify that new segment gets created in ideal state with CONSUMING
           Assert.assertNotNull(idealState.getRecord().getMapFields().get(llcSegmentName.getSegmentName()));
@@ -572,13 +572,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
           Map<String, Map<String, String>> oldMapFields = idealStateCopy.getRecord().getMapFields();
 
           if (tooSoonToCorrect) {
-            segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+            segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
             // validate nothing changed and try again with disabled
             verifyNoChangeToOldEntries(oldMapFields, idealState);
             segmentManager.tooSoonToCorrect = false;
           }
 
-          segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+          segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
 
           // verify that new segment gets created in ideal state with CONSUMING and old segment ONLINE
           Assert.assertNotNull(idealState.getRecord().getMapFields().get(latestSegment.getSegmentName()).values().
@@ -615,7 +615,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
               (ZNRecord) znRecordSerializer.deserialize(znRecordSerializer.serialize(idealState.getRecord())));
           Map<String, Map<String, String>> oldMapFields = idealStateCopy.getRecord().getMapFields();
 
-          segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+          segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
 
           verifyRepairs(tableConfig, idealState, expectedPartitionAssignment, segmentManager, oldMapFields);
         } else {
@@ -644,12 +644,12 @@ public class PinotLLCRealtimeSegmentManagerTest {
             Map<String, Map<String, String>> oldMapFields = idealStateCopy.getRecord().getMapFields();
 
             if (tooSoonToCorrect) {
-              segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+              segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
               // validate nothing changed and try again with disabled
               verifyNoChangeToOldEntries(oldMapFields, idealState);
               segmentManager.tooSoonToCorrect = false;
             }
-            segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+            segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
 
             verifyRepairs(tableConfig, idealState, expectedPartitionAssignment, segmentManager, oldMapFields);
           } else {
@@ -677,13 +677,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
               Map<String, Map<String, String>> oldMapFields = idealStateCopy.getRecord().getMapFields();
 
               if (tooSoonToCorrect) {
-                segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+                segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
                 // validate nothing changed and try again with disabled
                 verifyNoChangeToOldEntries(oldMapFields, idealState);
                 segmentManager.tooSoonToCorrect = false;
               }
 
-              segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+              segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
 
               verifyRepairs(tableConfig, idealState, expectedPartitionAssignment, segmentManager, oldMapFields);
             } else {
@@ -695,7 +695,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
                   (ZNRecord) znRecordSerializer.deserialize(znRecordSerializer.serialize(idealState.getRecord())));
               Map<String, Map<String, String>> oldMapFields = idealStateCopy.getRecord().getMapFields();
 
-              segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+              segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
 
               // verify that nothing changed
               verifyNoChangeToOldEntries(oldMapFields, idealState);
@@ -830,7 +830,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     IdealState idealState = idealStateBuilder.clear().build();
     segmentManager._metadataMap.clear();
 
-    segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+    segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
     PartitionAssignment partitionAssignment =
         segmentManager._partitionAssignmentGenerator.getStreamPartitionAssignmentFromIdealState(tableConfig,
             idealState);
@@ -845,7 +845,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
       FakePinotLLCRealtimeSegmentManager segmentManager, TableConfig tableConfig, int nPartitions) {
     IdealState idealState = idealStateBuilder.clear().build();
     segmentManager._metadataMap.clear();
-    segmentManager.validateLLCSegments(tableConfig, idealState, nPartitions);
+    segmentManager.ensureAllPartitionsConsuming(tableConfig, idealState, nPartitions);
     return idealStateBuilder.build();
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 02/08: Change access modifier of some fields

Posted by ne...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch split_vm_tasks_2
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit f5095586990ef585cee7b9d3b957f2ff9f52dace
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Fri Jan 4 16:02:30 2019 -0800

    Change access modifier of some fields
---
 .../pinot/controller/validation/OfflineSegmentIntervalChecker.java    | 4 ++--
 .../pinot/controller/validation/RealtimeSegmentValidationManager.java | 4 ++--
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index e952437..8695353 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -46,8 +46,8 @@ import org.slf4j.LoggerFactory;
 public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask {
   private static final Logger LOGGER = LoggerFactory.getLogger(OfflineSegmentIntervalChecker.class);
 
-  protected final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
-  protected final ValidationMetrics _validationMetrics;
+  private final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
+  private final ValidationMetrics _validationMetrics;
 
   public OfflineSegmentIntervalChecker(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
       PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, ValidationMetrics validationMetrics) {
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 71e50d2..42208e7 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -42,8 +42,8 @@ import org.slf4j.LoggerFactory;
 public class RealtimeSegmentValidationManager extends ControllerPeriodicTask {
   private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeSegmentValidationManager.class);
 
-  protected final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
-  protected final ValidationMetrics _validationMetrics;
+  private final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
+  private final ValidationMetrics _validationMetrics;
 
   private final int _segmentLevelValidationIntervalInSeconds;
   private long _lastUpdateRealtimeDocumentCountTimeMs = 0L;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 07/08: Using new defaults for each periodic task

Posted by ne...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch split_vm_tasks_2
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 01fdea1f8b456a98aa70d1abc5074f242ba03277
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Tue Jan 8 10:02:52 2019 -0800

    Using new defaults for each periodic task
---
 .../linkedin/pinot/controller/ControllerConf.java  | 26 +++++++++-------------
 1 file changed, 11 insertions(+), 15 deletions(-)

diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
index b0692a9..863f3cc 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
@@ -78,9 +78,7 @@ public class ControllerConf extends PropertiesConfiguration {
         "controller.segment.level.validation.intervalInSeconds";
 
     private static final int DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours.
-    @Deprecated // The ValidationManager has been split up into 3 separate tasks, each having their own frequency config settings
-    private static final int DEPRECATED_DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
-    private static final int DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS = 24 * 60 * 60; // 6 Hours.
+    private static final int DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS = 24 * 60 * 60; // 24 Hours.
     private static final int DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
     private static final int DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
     private static final int DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS = 5 * 60; // 5 minutes
@@ -356,18 +354,11 @@ public class ControllerConf extends PropertiesConfiguration {
         Integer.toString(retentionFrequencyInSeconds));
   }
 
-  private int getValidationControllerFrequencyInSeconds() {
-    if (containsKey(ControllerPeriodicTasksConf.DEPRECATED_VALIDATION_MANAGER_FREQUENCY_IN_SECONDS)) {
-      return Integer.parseInt(
-          (String) getProperty(ControllerPeriodicTasksConf.DEPRECATED_VALIDATION_MANAGER_FREQUENCY_IN_SECONDS));
-    }
-    return ControllerPeriodicTasksConf.DEPRECATED_DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS;
-  }
-
   /**
    * Returns the config value for controller.offline.segment.interval.checker.frequencyInSeconds if it exists.
    * If it doesn't exist, returns the segment level validation interval. This is done in order to retain the current behavior,
    * wherein the offline validation tasks were done at segment level validation interval frequency
+   * The default value is the new DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS
    * @return
    */
   public int getOfflineSegmentIntervalCheckerFrequencyInSeconds() {
@@ -375,7 +366,8 @@ public class ControllerConf extends PropertiesConfiguration {
       return Integer.parseInt(
           (String) getProperty(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS));
     }
-    return getSegmentLevelValidationIntervalInSeconds();
+    return getInt(ControllerPeriodicTasksConf.SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS,
+        ControllerPeriodicTasksConf.DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS);
   }
 
   public void setOfflineSegmentIntervalCheckerFrequencyInSeconds(int validationFrequencyInSeconds) {
@@ -387,6 +379,7 @@ public class ControllerConf extends PropertiesConfiguration {
    * Returns the config value for controller.realtime.segment.validation.frequencyInSeconds if it exists.
    * If it doesn't exist, returns the validation controller frequency. This is done in order to retain the current behavior,
    * wherein the realtime validation tasks were done at validation controller frequency
+   * The default value is the new DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS
    * @return
    */
   public int getRealtimeSegmentValidationFrequencyInSeconds() {
@@ -394,7 +387,8 @@ public class ControllerConf extends PropertiesConfiguration {
       return Integer.parseInt(
           (String) getProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS));
     }
-    return getValidationControllerFrequencyInSeconds();
+    return getInt(ControllerPeriodicTasksConf.DEPRECATED_VALIDATION_MANAGER_FREQUENCY_IN_SECONDS,
+        ControllerPeriodicTasksConf.DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS);
   }
 
   public void setRealtimeSegmentValidationFrequencyInSeconds(int validationFrequencyInSeconds) {
@@ -405,7 +399,8 @@ public class ControllerConf extends PropertiesConfiguration {
   /**
    * Returns the config value for  controller.broker.resource.validation.frequencyInSeconds if it exists.
    * If it doesn't exist, returns the validation controller frequency. This is done in order to retain the current behavior,
-   * wherin the broker resource validation tasks were done at validation controller frequency
+   * wherein the broker resource validation tasks were done at validation controller frequency
+   * The default value is the new DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS
    * @return
    */
   public int getBrokerResourceValidationFrequencyInSeconds() {
@@ -413,7 +408,8 @@ public class ControllerConf extends PropertiesConfiguration {
       return Integer.parseInt(
           (String) getProperty(ControllerPeriodicTasksConf.BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS));
     }
-    return getValidationControllerFrequencyInSeconds();
+    return getInt(ControllerPeriodicTasksConf.DEPRECATED_VALIDATION_MANAGER_FREQUENCY_IN_SECONDS,
+        ControllerPeriodicTasksConf.DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS);
   }
 
   public void setBrokerResourceValidationFrequencyInSeconds(int validationFrequencyInSeconds) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 06/08: Change DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS to 24 hours, to match current default behavior (segment level validation interval)

Posted by ne...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch split_vm_tasks_2
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 7f624ee9f06a5a1d90960d14ed60a5fdc1df1d66
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Mon Jan 7 18:54:38 2019 -0800

    Change DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS to 24 hours, to match current default behavior (segment level validation interval)
---
 .../src/main/java/com/linkedin/pinot/controller/ControllerConf.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
index c6ff82e..b0692a9 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
@@ -80,7 +80,7 @@ public class ControllerConf extends PropertiesConfiguration {
     private static final int DEFAULT_RETENTION_CONTROLLER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours.
     @Deprecated // The ValidationManager has been split up into 3 separate tasks, each having their own frequency config settings
     private static final int DEPRECATED_DEFAULT_VALIDATION_CONTROLLER_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
-    private static final int DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours.
+    private static final int DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS = 24 * 60 * 60; // 6 Hours.
     private static final int DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
     private static final int DEFAULT_BROKER_RESOURCE_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
     private static final int DEFAULT_STATUS_CONTROLLER_FREQUENCY_IN_SECONDS = 5 * 60; // 5 minutes


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org