You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2018/12/18 21:50:33 UTC

[incubator-pinot] 01/01: Move up iteration of tables into ControllerPeriodicTask interface

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch periodic_task_order
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 6ca1d1018a4c87a90f21211256918e5abd725686
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Tue Dec 18 13:47:47 2018 -0800

    Move up iteration of tables into ControllerPeriodicTask interface
---
 .../controller/helix/SegmentStatusChecker.java     | 285 +++++++++++----------
 .../helix/core/minion/PinotTaskManager.java        |  99 +++----
 .../core/periodictask/ControllerPeriodicTask.java  |  24 +-
 .../core/relocation/RealtimeSegmentRelocator.java  |  76 +++---
 .../helix/core/retention/RetentionManager.java     |  27 +-
 .../controller/validation/ValidationManager.java   |  89 +++----
 .../periodictask/ControllerPeriodicTaskTest.java   |  17 +-
 7 files changed, 338 insertions(+), 279 deletions(-)

diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
index 549dc9d..d444e67 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
@@ -47,13 +47,19 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
   public static final String ERROR = "ERROR";
   public static final String CONSUMING = "CONSUMING";
   private final ControllerMetrics _metricsRegistry;
-  private final ControllerConf _config;
   private final HelixAdmin _helixAdmin;
+  private final String _helixClusterName;
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   private final int _waitForPushTimeSeconds;
 
   // log messages about disabled tables atmost once a day
   private static final long DISABLED_TABLE_LOG_INTERVAL_MS = TimeUnit.DAYS.toMillis(1);
   private long _lastDisabledTableLogTimestamp = 0;
+  private boolean _logDisabledTables;
+  private int _realTimeTableCount;
+  private int _offlineTableCount;
+  private int _disabledTableCount;
+
 
   /**
    * Constructs the segment status checker.
@@ -64,7 +70,9 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
       ControllerMetrics metricsRegistry) {
     super("SegmentStatusChecker", config.getStatusCheckerFrequencyInSeconds(), pinotHelixResourceManager);
     _helixAdmin = pinotHelixResourceManager.getHelixAdmin();
-    _config = config;
+    _helixClusterName = pinotHelixResourceManager.getHelixClusterName();
+    _propertyStore = _pinotHelixResourceManager.getPropertyStore();
+
     _waitForPushTimeSeconds = config.getStatusCheckerWaitForPushTimeInSeconds();
     _metricsRegistry = metricsRegistry;
   }
@@ -82,166 +90,167 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
   }
 
   @Override
-  public void process(List<String> tables) {
-    updateSegmentMetrics(tables);
-  }
-
-  /**
-   * Runs a segment status pass over the given tables.
-   * TODO: revisit the logic and reduce the ZK access
-   *
-   * @param tables List of table names
-   */
-  private void updateSegmentMetrics(List<String> tables) {
-    // Fetch the list of tables
-    String helixClusterName = _pinotHelixResourceManager.getHelixClusterName();
-    HelixAdmin helixAdmin = _pinotHelixResourceManager.getHelixAdmin();
-    int realTimeTableCount = 0;
-    int offlineTableCount = 0;
-    int disabledTableCount = 0;
-    ZkHelixPropertyStore<ZNRecord> propertyStore = _pinotHelixResourceManager.getPropertyStore();
+  public void preprocess() {
+    _realTimeTableCount = 0;
+    _offlineTableCount = 0;
+    _disabledTableCount = 0;
 
     // check if we need to log disabled tables log messages
-    boolean logDisabledTables = false;
     long now = System.currentTimeMillis();
     if (now - _lastDisabledTableLogTimestamp >= DISABLED_TABLE_LOG_INTERVAL_MS) {
-      logDisabledTables = true;
+      _logDisabledTables = true;
       _lastDisabledTableLogTimestamp = now;
     } else {
-      logDisabledTables = false;
+      _logDisabledTables = false;
     }
+  }
 
-    for (String tableName : tables) {
-      try {
-        if (TableNameBuilder.getTableTypeFromTableName(tableName) == TableType.OFFLINE) {
-          offlineTableCount++;
-        } else {
-          realTimeTableCount++;
-        }
-        IdealState idealState = helixAdmin.getResourceIdealState(helixClusterName, tableName);
-        if ((idealState == null) || (idealState.getPartitionSet().isEmpty())) {
-          int nReplicasFromIdealState = 1;
-          try {
-            if (idealState != null) {
-              nReplicasFromIdealState = Integer.valueOf(idealState.getReplicas());
-            }
-          } catch (NumberFormatException e) {
-            // Ignore
+  @Override
+  public void process(String tableNameWithType) {
+    updateSegmentMetrics(tableNameWithType);
+  }
+
+  @Override
+  public void postprocess() {
+    _metricsRegistry.setValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT, _realTimeTableCount);
+    _metricsRegistry.setValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT, _offlineTableCount);
+    _metricsRegistry.setValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT, _disabledTableCount);
+  }
+
+  /**
+   * Runs a segment status pass over the given table.
+   * TODO: revisit the logic and reduce the ZK access
+   *
+   * @param tableNameWithType
+   */
+  private void updateSegmentMetrics(String tableNameWithType) {
+
+    try {
+      if (TableNameBuilder.getTableTypeFromTableName(tableNameWithType) == TableType.OFFLINE) {
+        _offlineTableCount++;
+      } else {
+        _realTimeTableCount++;
+      }
+      IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
+      if ((idealState == null) || (idealState.getPartitionSet().isEmpty())) {
+        int nReplicasFromIdealState = 1;
+        try {
+          if (idealState != null) {
+            nReplicasFromIdealState = Integer.valueOf(idealState.getReplicas());
           }
-          _metricsRegistry.setValueOfTableGauge(tableName, ControllerGauge.NUMBER_OF_REPLICAS, nReplicasFromIdealState);
-          _metricsRegistry.setValueOfTableGauge(tableName, ControllerGauge.PERCENT_OF_REPLICAS, 100);
-          _metricsRegistry.setValueOfTableGauge(tableName, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, 100);
-          continue;
+        } catch (NumberFormatException e) {
+          // Ignore
         }
+        _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, nReplicasFromIdealState);
+        _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS, 100);
+        _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, 100);
+        return;
+      }
 
-        if (!idealState.isEnabled()) {
-          if (logDisabledTables) {
-            LOGGER.warn("Table {} is disabled. Skipping segment status checks", tableName);
-          }
-          resetTableMetrics(tableName);
-          disabledTableCount++;
-          continue;
+      if (!idealState.isEnabled()) {
+        if (_logDisabledTables) {
+          LOGGER.warn("Table {} is disabled. Skipping segment status checks", tableNameWithType);
         }
+        resetTableMetrics(tableNameWithType);
+        _disabledTableCount++;
+        return;
+      }
 
-        _metricsRegistry.setValueOfTableGauge(tableName, ControllerGauge.IDEALSTATE_ZNODE_SIZE,
-            idealState.toString().length());
-        _metricsRegistry.setValueOfTableGauge(tableName, ControllerGauge.SEGMENT_COUNT,
-            (long) (idealState.getPartitionSet().size()));
-        ExternalView externalView = helixAdmin.getResourceExternalView(helixClusterName, tableName);
+      _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.IDEALSTATE_ZNODE_SIZE,
+          idealState.toString().length());
+      _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENT_COUNT,
+          (long) (idealState.getPartitionSet().size()));
+      ExternalView externalView = _helixAdmin.getResourceExternalView(_helixClusterName, tableNameWithType);
 
-        int nReplicasIdealMax = 0; // Keeps track of maximum number of replicas in ideal state
-        int nReplicasExternal = -1; // Keeps track of minimum number of replicas in external view
-        int nErrors = 0; // Keeps track of number of segments in error state
-        int nOffline = 0; // Keeps track of number segments with no online replicas
-        int nSegments = 0; // Counts number of segments
-        for (String partitionName : idealState.getPartitionSet()) {
-          int nReplicas = 0;
-          int nIdeal = 0;
-          nSegments++;
-          // Skip segments not online in ideal state
-          for (Map.Entry<String, String> serverAndState : idealState.getInstanceStateMap(partitionName).entrySet()) {
-            if (serverAndState == null) {
-              break;
-            }
-            if (serverAndState.getValue().equals(ONLINE)) {
-              nIdeal++;
-              break;
-            }
-          }
-          if (nIdeal == 0) {
-            // No online segments in ideal state
-            continue;
+      int nReplicasIdealMax = 0; // Keeps track of maximum number of replicas in ideal state
+      int nReplicasExternal = -1; // Keeps track of minimum number of replicas in external view
+      int nErrors = 0; // Keeps track of number of segments in error state
+      int nOffline = 0; // Keeps track of number segments with no online replicas
+      int nSegments = 0; // Counts number of segments
+      for (String partitionName : idealState.getPartitionSet()) {
+        int nReplicas = 0;
+        int nIdeal = 0;
+        nSegments++;
+        // Skip segments not online in ideal state
+        for (Map.Entry<String, String> serverAndState : idealState.getInstanceStateMap(partitionName).entrySet()) {
+          if (serverAndState == null) {
+            break;
           }
-          nReplicasIdealMax = (idealState.getInstanceStateMap(partitionName).size() > nReplicasIdealMax)
-              ? idealState.getInstanceStateMap(partitionName).size() : nReplicasIdealMax;
-          if ((externalView == null) || (externalView.getStateMap(partitionName) == null)) {
-            // No replicas for this segment
-            TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
-            if ((tableType != null) && (tableType.equals(TableType.OFFLINE))) {
-              OfflineSegmentZKMetadata segmentZKMetadata =
-                  ZKMetadataProvider.getOfflineSegmentZKMetadata(propertyStore, tableName, partitionName);
-              if (segmentZKMetadata != null
-                  && segmentZKMetadata.getPushTime() > System.currentTimeMillis() - _waitForPushTimeSeconds * 1000) {
-                // push not yet finished, skip
-                continue;
-              }
-            }
-            nOffline++;
-            if (nOffline < MaxOfflineSegmentsToLog) {
-              LOGGER.warn("Segment {} of table {} has no replicas", partitionName, tableName);
-            }
-            nReplicasExternal = 0;
-            continue;
+          if (serverAndState.getValue().equals(ONLINE)) {
+            nIdeal++;
+            break;
           }
-          for (Map.Entry<String, String> serverAndState : externalView.getStateMap(partitionName).entrySet()) {
-            // Count number of online replicas. Ignore if state is CONSUMING.
-            // It is possible for a segment to be ONLINE in idealstate, and CONSUMING in EV for a short period of time.
-            // So, ignore this combination. If a segment exists in this combination for a long time, we will get
-            // low level-partition-not-consuming alert anyway.
-            if (serverAndState.getValue().equals(ONLINE) || serverAndState.getValue().equals(CONSUMING)) {
-              nReplicas++;
-            }
-            if (serverAndState.getValue().equals(ERROR)) {
-              nErrors++;
+        }
+        if (nIdeal == 0) {
+          // No online segments in ideal state
+          continue;
+        }
+        nReplicasIdealMax =
+            (idealState.getInstanceStateMap(partitionName).size() > nReplicasIdealMax) ? idealState.getInstanceStateMap(
+                partitionName).size() : nReplicasIdealMax;
+        if ((externalView == null) || (externalView.getStateMap(partitionName) == null)) {
+          // No replicas for this segment
+          TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+          if ((tableType != null) && (tableType.equals(TableType.OFFLINE))) {
+            OfflineSegmentZKMetadata segmentZKMetadata =
+                ZKMetadataProvider.getOfflineSegmentZKMetadata(_propertyStore, tableNameWithType, partitionName);
+            if (segmentZKMetadata != null
+                && segmentZKMetadata.getPushTime() > System.currentTimeMillis() - _waitForPushTimeSeconds * 1000) {
+              // push not yet finished, skip
+              continue;
             }
           }
-          if (nReplicas == 0) {
-            if (nOffline < MaxOfflineSegmentsToLog) {
-              LOGGER.warn("Segment {} of table {} has no online replicas", partitionName, tableName);
-            }
-            nOffline++;
+          nOffline++;
+          if (nOffline < MaxOfflineSegmentsToLog) {
+            LOGGER.warn("Segment {} of table {} has no replicas", partitionName, tableNameWithType);
           }
-          nReplicasExternal =
-              ((nReplicasExternal > nReplicas) || (nReplicasExternal == -1)) ? nReplicas : nReplicasExternal;
-        }
-        if (nReplicasExternal == -1) {
-          nReplicasExternal = (nReplicasIdealMax == 0) ? 1 : 0;
+          nReplicasExternal = 0;
+          continue;
         }
-        // Synchronization provided by Controller Gauge to make sure that only one thread updates the gauge
-        _metricsRegistry.setValueOfTableGauge(tableName, ControllerGauge.NUMBER_OF_REPLICAS, nReplicasExternal);
-        _metricsRegistry.setValueOfTableGauge(tableName, ControllerGauge.PERCENT_OF_REPLICAS,
-            (nReplicasIdealMax > 0) ? (nReplicasExternal * 100 / nReplicasIdealMax) : 100);
-        _metricsRegistry.setValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE, nErrors);
-        _metricsRegistry.setValueOfTableGauge(tableName, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE,
-            (nSegments > 0) ? (100 - (nOffline * 100 / nSegments)) : 100);
-        if (nOffline > 0) {
-          LOGGER.warn("Table {} has {} segments with no online replicas", tableName, nOffline);
+        for (Map.Entry<String, String> serverAndState : externalView.getStateMap(partitionName).entrySet()) {
+          // Count number of online replicas. Ignore if state is CONSUMING.
+          // It is possible for a segment to be ONLINE in idealstate, and CONSUMING in EV for a short period of time.
+          // So, ignore this combination. If a segment exists in this combination for a long time, we will get
+          // low level-partition-not-consuming alert anyway.
+          if (serverAndState.getValue().equals(ONLINE) || serverAndState.getValue().equals(CONSUMING)) {
+            nReplicas++;
+          }
+          if (serverAndState.getValue().equals(ERROR)) {
+            nErrors++;
+          }
         }
-        if (nReplicasExternal < nReplicasIdealMax) {
-          LOGGER.warn("Table {} has {} replicas, below replication threshold :{}", tableName, nReplicasExternal,
-              nReplicasIdealMax);
+        if (nReplicas == 0) {
+          if (nOffline < MaxOfflineSegmentsToLog) {
+            LOGGER.warn("Segment {} of table {} has no online replicas", partitionName, tableNameWithType);
+          }
+          nOffline++;
         }
-      } catch (Exception e) {
-        LOGGER.warn("Caught exception while updating segment status for table {}", e, tableName);
-
-        // Remove the metric for this table
-        resetTableMetrics(tableName);
+        nReplicasExternal =
+            ((nReplicasExternal > nReplicas) || (nReplicasExternal == -1)) ? nReplicas : nReplicasExternal;
       }
-    }
+      if (nReplicasExternal == -1) {
+        nReplicasExternal = (nReplicasIdealMax == 0) ? 1 : 0;
+      }
+      // Synchronization provided by Controller Gauge to make sure that only one thread updates the gauge
+      _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, nReplicasExternal);
+      _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS,
+          (nReplicasIdealMax > 0) ? (nReplicasExternal * 100 / nReplicasIdealMax) : 100);
+      _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE, nErrors);
+      _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE,
+          (nSegments > 0) ? (100 - (nOffline * 100 / nSegments)) : 100);
+      if (nOffline > 0) {
+        LOGGER.warn("Table {} has {} segments with no online replicas", tableNameWithType, nOffline);
+      }
+      if (nReplicasExternal < nReplicasIdealMax) {
+        LOGGER.warn("Table {} has {} replicas, below replication threshold :{}", tableNameWithType, nReplicasExternal,
+            nReplicasIdealMax);
+      }
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while updating segment status for table {}", e, tableNameWithType);
 
-    _metricsRegistry.setValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT, realTimeTableCount);
-    _metricsRegistry.setValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT, offlineTableCount);
-    _metricsRegistry.setValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT, disabledTableCount);
+      // Remove the metric for this table
+      resetTableMetrics(tableNameWithType);
+    }
   }
 
   private void setStatusToDefault() {
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
index 0ff695d..d1ebd96 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -49,6 +49,11 @@ public class PinotTaskManager extends ControllerPeriodicTask {
   private final TaskGeneratorRegistry _taskGeneratorRegistry;
   private final ControllerMetrics _controllerMetrics;
 
+  private Map<String, List<TableConfig>> _enabledTableConfigMap;
+  private Set<String> _taskTypes;
+  private int _numTaskTypes;
+  private Map<String, String> _tasksScheduled;
+
   public PinotTaskManager(@Nonnull PinotHelixTaskResourceManager helixTaskResourceManager,
       @Nonnull PinotHelixResourceManager helixResourceManager, @Nonnull ControllerConf controllerConf,
       @Nonnull ControllerMetrics controllerMetrics) {
@@ -81,81 +86,81 @@ public class PinotTaskManager extends ControllerPeriodicTask {
   }
 
   /**
-   * Check the Pinot cluster status and schedule new tasks for the given tables.
-   *
-   * @param tables List of table names
-   * @return Map from task type to task scheduled
+   * Public API to schedule tasks. It doesn't matter whether current pinot controller is leader.
    */
-  @Nonnull
-  private Map<String, String> scheduleTasks(List<String> tables) {
+  public Map<String, String> scheduleTasks() {
+    process(_pinotHelixResourceManager.getAllTables());
+    return getTasksScheduled();
+  }
+
+  /**
+   * Performs necessary cleanups (e.g. remove metrics) when the controller leadership changes.
+   */
+  @Override
+  public void onBecomeNotLeader() {
+    LOGGER.info("Perform task cleanups.");
+    // Performs necessary cleanups for each task type.
+    for (String taskType : _taskGeneratorRegistry.getAllTaskTypes()) {
+      _taskGeneratorRegistry.getTaskGenerator(taskType).nonLeaderCleanUp();
+    }
+  }
+
+
+  @Override
+  protected void preprocess() {
     _controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L);
 
-    Set<String> taskTypes = _taskGeneratorRegistry.getAllTaskTypes();
-    int numTaskTypes = taskTypes.size();
-    Map<String, List<TableConfig>> enabledTableConfigMap = new HashMap<>(numTaskTypes);
+    _taskTypes = _taskGeneratorRegistry.getAllTaskTypes();
+    _numTaskTypes = _taskTypes.size();
+    _enabledTableConfigMap = new HashMap<>(_numTaskTypes);
 
-    for (String taskType : taskTypes) {
-      enabledTableConfigMap.put(taskType, new ArrayList<>());
+    for (String taskType : _taskTypes) {
+      _enabledTableConfigMap.put(taskType, new ArrayList<>());
 
       // Ensure all task queues exist
       _helixTaskResourceManager.ensureTaskQueueExists(taskType);
     }
+  }
 
-    // Scan all table configs to get the tables with tasks enabled
-    for (String tableName : tables) {
-      TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableName);
-      if (tableConfig != null) {
-        TableTaskConfig taskConfig = tableConfig.getTaskConfig();
-        if (taskConfig != null) {
-          for (String taskType : taskTypes) {
-            if (taskConfig.isTaskTypeEnabled(taskType)) {
-              enabledTableConfigMap.get(taskType).add(tableConfig);
-            }
+  @Override
+  protected void process(String tableNameWithType) {
+    TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    if (tableConfig != null) {
+      TableTaskConfig taskConfig = tableConfig.getTaskConfig();
+      if (taskConfig != null) {
+        for (String taskType : _taskTypes) {
+          if (taskConfig.isTaskTypeEnabled(taskType)) {
+            _enabledTableConfigMap.get(taskType).add(tableConfig);
           }
         }
       }
     }
+  }
 
+  @Override
+  protected void postprocess() {
     // Generate each type of tasks
-    Map<String, String> tasksScheduled = new HashMap<>(numTaskTypes);
-    for (String taskType : taskTypes) {
+    _tasksScheduled = new HashMap<>(_numTaskTypes);
+    for (String taskType : _taskTypes) {
       LOGGER.info("Generating tasks for task type: {}", taskType);
       PinotTaskGenerator pinotTaskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
-      List<PinotTaskConfig> pinotTaskConfigs = pinotTaskGenerator.generateTasks(enabledTableConfigMap.get(taskType));
+      List<PinotTaskConfig> pinotTaskConfigs = pinotTaskGenerator.generateTasks(_enabledTableConfigMap.get(taskType));
       int numTasks = pinotTaskConfigs.size();
       if (numTasks > 0) {
         LOGGER.info("Submitting {} tasks for task type: {} with task configs: {}", numTasks, taskType,
             pinotTaskConfigs);
-        tasksScheduled.put(taskType, _helixTaskResourceManager.submitTask(pinotTaskConfigs,
+        _tasksScheduled.put(taskType, _helixTaskResourceManager.submitTask(pinotTaskConfigs,
             pinotTaskGenerator.getNumConcurrentTasksPerInstance()));
         _controllerMetrics.addMeteredTableValue(taskType, ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
       }
     }
-
-    return tasksScheduled;
   }
 
   /**
-   * Public API to schedule tasks. It doesn't matter whether current pinot controller is leader.
-   */
-  public Map<String, String> scheduleTasks() {
-    return scheduleTasks(_pinotHelixResourceManager.getAllTables());
-  }
-
-  /**
-   * Performs necessary cleanups (e.g. remove metrics) when the controller leadership changes.
+   * Returns the tasks that have been scheduled as part of the postprocess
+   * @return
    */
-  @Override
-  public void onBecomeNotLeader() {
-    LOGGER.info("Perform task cleanups.");
-    // Performs necessary cleanups for each task type.
-    for (String taskType : _taskGeneratorRegistry.getAllTaskTypes()) {
-      _taskGeneratorRegistry.getTaskGenerator(taskType).nonLeaderCleanUp();
-    }
-  }
-
-  @Override
-  public void process(List<String> tables) {
-    scheduleTasks(tables);
+  public Map<String, String> getTasksScheduled() {
+    return _tasksScheduled;
   }
 }
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 3b12c0f..9ce5c59 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -110,7 +110,29 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask {
    *
    * @param tables List of table names
    */
-  public abstract void process(List<String> tables);
+  protected void process(List<String> tables) {
+    preprocess();
+    for (String table : tables) {
+      process(table);
+    }
+    postprocess();
+  }
+
+  /**
+   * This method runs before processing all tables
+   */
+  protected abstract void preprocess();
+
+  /**
+   * Process the controller periodic task for the given table
+   * @param tableNameWithType
+   */
+  protected abstract void process(String tableNameWithType);
+
+  /**
+   * This method runs after processing all tables
+   */
+  protected abstract void postprocess();
 
   @VisibleForTesting
   protected boolean isLeader() {
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index d152887..01201fa 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -58,51 +58,59 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
   }
 
   @Override
-  public void process(List<String> tables) {
-    runRelocation(tables);
+  protected void preprocess() {
+
+  }
+
+  @Override
+  protected void process(String tableNameWithType) {
+    runRelocation(tableNameWithType);
+  }
+
+  @Override
+  protected void postprocess() {
+
   }
 
   /**
-   * Check the given tables. Perform relocation of segments if table is realtime and relocation is required
+   * Check the given table. Perform relocation of segments if table is realtime and relocation is required
    * TODO: Model this to implement {@link com.linkedin.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategy} interface
    * https://github.com/linkedin/pinot/issues/2609
    *
-   * @param tables List of table names
+   * @param tableNameWithType
    */
-  private void runRelocation(List<String> tables) {
-    for (String tableNameWithType : tables) {
-      // Only consider realtime tables.
-      if (!TableNameBuilder.REALTIME.tableHasTypeSuffix(tableNameWithType)) {
-        continue;
+  private void runRelocation(String tableNameWithType) {
+    // Only consider realtime tables.
+    if (!TableNameBuilder.REALTIME.tableHasTypeSuffix(tableNameWithType)) {
+      return;
+    }
+    try {
+      LOGGER.info("Starting relocation of segments for table: {}", tableNameWithType);
+
+      TableConfig tableConfig = _pinotHelixResourceManager.getRealtimeTableConfig(tableNameWithType);
+      final RealtimeTagConfig realtimeTagConfig = new RealtimeTagConfig(tableConfig);
+      if (!realtimeTagConfig.isRelocateCompletedSegments()) {
+        LOGGER.info("Skipping relocation of segments for {}", tableNameWithType);
+        return;
       }
-      try {
-        LOGGER.info("Starting relocation of segments for table: {}", tableNameWithType);
-
-        TableConfig tableConfig = _pinotHelixResourceManager.getRealtimeTableConfig(tableNameWithType);
-        final RealtimeTagConfig realtimeTagConfig = new RealtimeTagConfig(tableConfig);
-        if (!realtimeTagConfig.isRelocateCompletedSegments()) {
-          LOGGER.info("Skipping relocation of segments for {}", tableNameWithType);
-          continue;
-        }
 
-        Function<IdealState, IdealState> updater = new Function<IdealState, IdealState>() {
-          @Nullable
-          @Override
-          public IdealState apply(@Nullable IdealState idealState) {
-            if (!idealState.isEnabled()) {
-              LOGGER.info("Skipping relocation of segments for {} since ideal state is disabled", tableNameWithType);
-              return null;
-            }
-            relocateSegments(realtimeTagConfig, idealState);
-            return idealState;
+      Function<IdealState, IdealState> updater = new Function<IdealState, IdealState>() {
+        @Nullable
+        @Override
+        public IdealState apply(@Nullable IdealState idealState) {
+          if (!idealState.isEnabled()) {
+            LOGGER.info("Skipping relocation of segments for {} since ideal state is disabled", tableNameWithType);
+            return null;
           }
-        };
+          relocateSegments(realtimeTagConfig, idealState);
+          return idealState;
+        }
+      };
 
-        HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(), tableNameWithType, updater,
-            RetryPolicies.exponentialBackoffRetryPolicy(5, 1000, 2.0f));
-      } catch (Exception e) {
-        LOGGER.error("Exception in relocating realtime segments of table {}", tableNameWithType, e);
-      }
+      HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(), tableNameWithType, updater,
+          RetryPolicies.exponentialBackoffRetryPolicy(5, 1000, 2.0f));
+    } catch (Exception e) {
+      LOGGER.error("Exception in relocating realtime segments of table {}", tableNameWithType, e);
     }
   }
 
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
index d76cc3b..c0b3e79 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
@@ -62,29 +62,26 @@ public class RetentionManager extends ControllerPeriodicTask {
   }
 
   @Override
-  public void process(List<String> tables) {
-    execute(tables);
+  protected void preprocess() {
+
   }
 
-  /**
-   * Manages retention for the given tables.
-   *
-   * @param tables List of table names
-   */
-  private void execute(List<String> tables) {
+  @Override
+  protected void process(String tableNameWithType) {
     try {
-      for (String tableNameWithType : tables) {
-        LOGGER.info("Start managing retention for table: {}", tableNameWithType);
-        manageRetentionForTable(tableNameWithType);
-      }
-
-      LOGGER.info("Removing aged (more than {} days) deleted segments for all tables", _deletedSegmentsRetentionInDays);
-      _pinotHelixResourceManager.getSegmentDeletionManager().removeAgedDeletedSegments(_deletedSegmentsRetentionInDays);
+      LOGGER.info("Start managing retention for table: {}", tableNameWithType);
+      manageRetentionForTable(tableNameWithType);
     } catch (Exception e) {
       LOGGER.error("Caught exception while managing retention for all tables", e);
     }
   }
 
+  @Override
+  protected void postprocess() {
+    LOGGER.info("Removing aged (more than {} days) deleted segments for all tables", _deletedSegmentsRetentionInDays);
+    _pinotHelixResourceManager.getSegmentDeletionManager().removeAgedDeletedSegments(_deletedSegmentsRetentionInDays);
+  }
+
   private void manageRetentionForTable(String tableNameWithType) {
     try {
       // Build retention strategy from table config
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
index 8717318..b9b775d 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
@@ -23,7 +23,7 @@ import com.linkedin.pinot.common.config.TableNameBuilder;
 import com.linkedin.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import com.linkedin.pinot.common.metrics.ValidationMetrics;
-import com.linkedin.pinot.common.utils.CommonConstants.Helix.TableType;
+import com.linkedin.pinot.common.utils.CommonConstants;
 import com.linkedin.pinot.common.utils.HLCSegmentName;
 import com.linkedin.pinot.common.utils.SegmentName;
 import com.linkedin.pinot.common.utils.time.TimeUtils;
@@ -57,6 +57,8 @@ public class ValidationManager extends ControllerPeriodicTask {
   private final ValidationMetrics _validationMetrics;
 
   private long _lastSegmentLevelValidationTimeMs = 0L;
+  private boolean _runSegmentLevelValidation;
+  private List<InstanceConfig> _instanceConfigs;
 
   public ValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
       PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, ValidationMetrics validationMetrics) {
@@ -74,61 +76,62 @@ public class ValidationManager extends ControllerPeriodicTask {
   }
 
   @Override
-  public void process(List<String> tables) {
-    runValidation(tables);
-  }
-
-  /**
-   * Runs a validation pass over the given tables.
-   *
-   * @param tables List of table names
-   */
-  private void runValidation(List<String> tables) {
+  public void preprocess() {
     // Run segment level validation using a separate interval
-    boolean runSegmentLevelValidation = false;
+    _runSegmentLevelValidation = false;
     long currentTimeMs = System.currentTimeMillis();
     if (TimeUnit.MILLISECONDS.toSeconds(currentTimeMs - _lastSegmentLevelValidationTimeMs)
         >= _segmentLevelValidationIntervalInSeconds) {
       LOGGER.info("Run segment-level validation");
-      runSegmentLevelValidation = true;
+      _runSegmentLevelValidation = true;
       _lastSegmentLevelValidationTimeMs = currentTimeMs;
     }
 
     // Cache instance configs to reduce ZK access
-    List<InstanceConfig> instanceConfigs = _pinotHelixResourceManager.getAllHelixInstanceConfigs();
-
-    for (String tableNameWithType : tables) {
-      try {
-        TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
-        if (tableConfig == null) {
-          LOGGER.warn("Failed to find table config for table: {}, skipping validation", tableNameWithType);
-          continue;
-        }
+    _instanceConfigs = _pinotHelixResourceManager.getAllHelixInstanceConfigs();
+  }
 
-        // Rebuild broker resource
-        Set<String> brokerInstances = _pinotHelixResourceManager.getAllInstancesForBrokerTenant(instanceConfigs,
-            tableConfig.getTenantConfig().getBroker());
-        _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType, brokerInstances);
+  @Override
+  public void process(String tableNameWithType) {
+    runValidation(tableNameWithType);
+  }
 
-        // Perform validation based on the table type
-        TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
-        if (tableType == TableType.OFFLINE) {
-          if (runSegmentLevelValidation) {
-            validateOfflineSegmentPush(tableConfig);
-          }
-        } else {
-          if (runSegmentLevelValidation) {
-            updateRealtimeDocumentCount(tableConfig);
-          }
-          Map<String, String> streamConfigMap = tableConfig.getIndexingConfig().getStreamConfigs();
-          StreamConfig streamConfig = new StreamConfig(streamConfigMap);
-          if (streamConfig.hasLowLevelConsumerType()) {
-            _llcRealtimeSegmentManager.validateLLCSegments(tableConfig);
-          }
+  @Override
+  public void postprocess() {
+
+  }
+
+  private void runValidation(String tableNameWithType) {
+    try {
+      TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+      if (tableConfig == null) {
+        LOGGER.warn("Failed to find table config for table: {}, skipping validation", tableNameWithType);
+        return;
+      }
+
+      // Rebuild broker resource
+      Set<String> brokerInstances = _pinotHelixResourceManager.getAllInstancesForBrokerTenant(_instanceConfigs,
+          tableConfig.getTenantConfig().getBroker());
+      _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType, brokerInstances);
+
+      // Perform validation based on the table type
+      CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      if (tableType == CommonConstants.Helix.TableType.OFFLINE) {
+        if (_runSegmentLevelValidation) {
+          validateOfflineSegmentPush(tableConfig);
+        }
+      } else {
+        if (_runSegmentLevelValidation) {
+          updateRealtimeDocumentCount(tableConfig);
+        }
+        Map<String, String> streamConfigMap = tableConfig.getIndexingConfig().getStreamConfigs();
+        StreamConfig streamConfig = new StreamConfig(streamConfigMap);
+        if (streamConfig.hasLowLevelConsumerType()) {
+          _llcRealtimeSegmentManager.validateLLCSegments(tableConfig);
         }
-      } catch (Exception e) {
-        LOGGER.warn("Caught exception while validating table: {}", tableNameWithType, e);
       }
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while validating table: {}", tableNameWithType, e);
     }
   }
 
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index 9a5d26c..1ada214 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -115,7 +115,22 @@ public class ControllerPeriodicTaskTest {
     }
 
     @Override
-    public void process(List<String> tables) {
+    protected void process(List<String> tables) {
+
+    }
+
+    @Override
+    protected void preprocess() {
+
+    }
+
+    @Override
+    protected void process(String tableNameWithType) {
+
+    }
+
+    @Override
+    public void postprocess() {
 
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org