You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2018/12/18 21:50:33 UTC
[incubator-pinot] 01/01: Move up iteration of tables into
ControllerPeriodicTask interface
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch periodic_task_order
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 6ca1d1018a4c87a90f21211256918e5abd725686
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Tue Dec 18 13:47:47 2018 -0800
Move up iteration of tables into ControllerPeriodicTask interface
---
.../controller/helix/SegmentStatusChecker.java | 285 +++++++++++----------
.../helix/core/minion/PinotTaskManager.java | 99 +++----
.../core/periodictask/ControllerPeriodicTask.java | 24 +-
.../core/relocation/RealtimeSegmentRelocator.java | 76 +++---
.../helix/core/retention/RetentionManager.java | 27 +-
.../controller/validation/ValidationManager.java | 89 +++----
.../periodictask/ControllerPeriodicTaskTest.java | 17 +-
7 files changed, 338 insertions(+), 279 deletions(-)
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
index 549dc9d..d444e67 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/SegmentStatusChecker.java
@@ -47,13 +47,19 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
public static final String ERROR = "ERROR";
public static final String CONSUMING = "CONSUMING";
private final ControllerMetrics _metricsRegistry;
- private final ControllerConf _config;
private final HelixAdmin _helixAdmin;
+ private final String _helixClusterName;
+ private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
private final int _waitForPushTimeSeconds;
// log messages about disabled tables atmost once a day
private static final long DISABLED_TABLE_LOG_INTERVAL_MS = TimeUnit.DAYS.toMillis(1);
private long _lastDisabledTableLogTimestamp = 0;
+ private boolean _logDisabledTables;
+ private int _realTimeTableCount;
+ private int _offlineTableCount;
+ private int _disabledTableCount;
+
/**
* Constructs the segment status checker.
@@ -64,7 +70,9 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
ControllerMetrics metricsRegistry) {
super("SegmentStatusChecker", config.getStatusCheckerFrequencyInSeconds(), pinotHelixResourceManager);
_helixAdmin = pinotHelixResourceManager.getHelixAdmin();
- _config = config;
+ _helixClusterName = pinotHelixResourceManager.getHelixClusterName();
+ _propertyStore = _pinotHelixResourceManager.getPropertyStore();
+
_waitForPushTimeSeconds = config.getStatusCheckerWaitForPushTimeInSeconds();
_metricsRegistry = metricsRegistry;
}
@@ -82,166 +90,167 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
}
@Override
- public void process(List<String> tables) {
- updateSegmentMetrics(tables);
- }
-
- /**
- * Runs a segment status pass over the given tables.
- * TODO: revisit the logic and reduce the ZK access
- *
- * @param tables List of table names
- */
- private void updateSegmentMetrics(List<String> tables) {
- // Fetch the list of tables
- String helixClusterName = _pinotHelixResourceManager.getHelixClusterName();
- HelixAdmin helixAdmin = _pinotHelixResourceManager.getHelixAdmin();
- int realTimeTableCount = 0;
- int offlineTableCount = 0;
- int disabledTableCount = 0;
- ZkHelixPropertyStore<ZNRecord> propertyStore = _pinotHelixResourceManager.getPropertyStore();
+ public void preprocess() {
+ _realTimeTableCount = 0;
+ _offlineTableCount = 0;
+ _disabledTableCount = 0;
// check if we need to log disabled tables log messages
- boolean logDisabledTables = false;
long now = System.currentTimeMillis();
if (now - _lastDisabledTableLogTimestamp >= DISABLED_TABLE_LOG_INTERVAL_MS) {
- logDisabledTables = true;
+ _logDisabledTables = true;
_lastDisabledTableLogTimestamp = now;
} else {
- logDisabledTables = false;
+ _logDisabledTables = false;
}
+ }
- for (String tableName : tables) {
- try {
- if (TableNameBuilder.getTableTypeFromTableName(tableName) == TableType.OFFLINE) {
- offlineTableCount++;
- } else {
- realTimeTableCount++;
- }
- IdealState idealState = helixAdmin.getResourceIdealState(helixClusterName, tableName);
- if ((idealState == null) || (idealState.getPartitionSet().isEmpty())) {
- int nReplicasFromIdealState = 1;
- try {
- if (idealState != null) {
- nReplicasFromIdealState = Integer.valueOf(idealState.getReplicas());
- }
- } catch (NumberFormatException e) {
- // Ignore
+ @Override
+ public void process(String tableNameWithType) {
+ updateSegmentMetrics(tableNameWithType);
+ }
+
+ @Override
+ public void postprocess() {
+ _metricsRegistry.setValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT, _realTimeTableCount);
+ _metricsRegistry.setValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT, _offlineTableCount);
+ _metricsRegistry.setValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT, _disabledTableCount);
+ }
+
+ /**
+ * Runs a segment status pass over the given table.
+ * TODO: revisit the logic and reduce the ZK access
+ *
+ * @param tableNameWithType
+ */
+ private void updateSegmentMetrics(String tableNameWithType) {
+
+ try {
+ if (TableNameBuilder.getTableTypeFromTableName(tableNameWithType) == TableType.OFFLINE) {
+ _offlineTableCount++;
+ } else {
+ _realTimeTableCount++;
+ }
+ IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
+ if ((idealState == null) || (idealState.getPartitionSet().isEmpty())) {
+ int nReplicasFromIdealState = 1;
+ try {
+ if (idealState != null) {
+ nReplicasFromIdealState = Integer.valueOf(idealState.getReplicas());
}
- _metricsRegistry.setValueOfTableGauge(tableName, ControllerGauge.NUMBER_OF_REPLICAS, nReplicasFromIdealState);
- _metricsRegistry.setValueOfTableGauge(tableName, ControllerGauge.PERCENT_OF_REPLICAS, 100);
- _metricsRegistry.setValueOfTableGauge(tableName, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, 100);
- continue;
+ } catch (NumberFormatException e) {
+ // Ignore
}
+ _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, nReplicasFromIdealState);
+ _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS, 100);
+ _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, 100);
+ return;
+ }
- if (!idealState.isEnabled()) {
- if (logDisabledTables) {
- LOGGER.warn("Table {} is disabled. Skipping segment status checks", tableName);
- }
- resetTableMetrics(tableName);
- disabledTableCount++;
- continue;
+ if (!idealState.isEnabled()) {
+ if (_logDisabledTables) {
+ LOGGER.warn("Table {} is disabled. Skipping segment status checks", tableNameWithType);
}
+ resetTableMetrics(tableNameWithType);
+ _disabledTableCount++;
+ return;
+ }
- _metricsRegistry.setValueOfTableGauge(tableName, ControllerGauge.IDEALSTATE_ZNODE_SIZE,
- idealState.toString().length());
- _metricsRegistry.setValueOfTableGauge(tableName, ControllerGauge.SEGMENT_COUNT,
- (long) (idealState.getPartitionSet().size()));
- ExternalView externalView = helixAdmin.getResourceExternalView(helixClusterName, tableName);
+ _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.IDEALSTATE_ZNODE_SIZE,
+ idealState.toString().length());
+ _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENT_COUNT,
+ (long) (idealState.getPartitionSet().size()));
+ ExternalView externalView = _helixAdmin.getResourceExternalView(_helixClusterName, tableNameWithType);
- int nReplicasIdealMax = 0; // Keeps track of maximum number of replicas in ideal state
- int nReplicasExternal = -1; // Keeps track of minimum number of replicas in external view
- int nErrors = 0; // Keeps track of number of segments in error state
- int nOffline = 0; // Keeps track of number segments with no online replicas
- int nSegments = 0; // Counts number of segments
- for (String partitionName : idealState.getPartitionSet()) {
- int nReplicas = 0;
- int nIdeal = 0;
- nSegments++;
- // Skip segments not online in ideal state
- for (Map.Entry<String, String> serverAndState : idealState.getInstanceStateMap(partitionName).entrySet()) {
- if (serverAndState == null) {
- break;
- }
- if (serverAndState.getValue().equals(ONLINE)) {
- nIdeal++;
- break;
- }
- }
- if (nIdeal == 0) {
- // No online segments in ideal state
- continue;
+ int nReplicasIdealMax = 0; // Keeps track of maximum number of replicas in ideal state
+ int nReplicasExternal = -1; // Keeps track of minimum number of replicas in external view
+ int nErrors = 0; // Keeps track of number of segments in error state
+ int nOffline = 0; // Keeps track of number segments with no online replicas
+ int nSegments = 0; // Counts number of segments
+ for (String partitionName : idealState.getPartitionSet()) {
+ int nReplicas = 0;
+ int nIdeal = 0;
+ nSegments++;
+ // Skip segments not online in ideal state
+ for (Map.Entry<String, String> serverAndState : idealState.getInstanceStateMap(partitionName).entrySet()) {
+ if (serverAndState == null) {
+ break;
}
- nReplicasIdealMax = (idealState.getInstanceStateMap(partitionName).size() > nReplicasIdealMax)
- ? idealState.getInstanceStateMap(partitionName).size() : nReplicasIdealMax;
- if ((externalView == null) || (externalView.getStateMap(partitionName) == null)) {
- // No replicas for this segment
- TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
- if ((tableType != null) && (tableType.equals(TableType.OFFLINE))) {
- OfflineSegmentZKMetadata segmentZKMetadata =
- ZKMetadataProvider.getOfflineSegmentZKMetadata(propertyStore, tableName, partitionName);
- if (segmentZKMetadata != null
- && segmentZKMetadata.getPushTime() > System.currentTimeMillis() - _waitForPushTimeSeconds * 1000) {
- // push not yet finished, skip
- continue;
- }
- }
- nOffline++;
- if (nOffline < MaxOfflineSegmentsToLog) {
- LOGGER.warn("Segment {} of table {} has no replicas", partitionName, tableName);
- }
- nReplicasExternal = 0;
- continue;
+ if (serverAndState.getValue().equals(ONLINE)) {
+ nIdeal++;
+ break;
}
- for (Map.Entry<String, String> serverAndState : externalView.getStateMap(partitionName).entrySet()) {
- // Count number of online replicas. Ignore if state is CONSUMING.
- // It is possible for a segment to be ONLINE in idealstate, and CONSUMING in EV for a short period of time.
- // So, ignore this combination. If a segment exists in this combination for a long time, we will get
- // low level-partition-not-consuming alert anyway.
- if (serverAndState.getValue().equals(ONLINE) || serverAndState.getValue().equals(CONSUMING)) {
- nReplicas++;
- }
- if (serverAndState.getValue().equals(ERROR)) {
- nErrors++;
+ }
+ if (nIdeal == 0) {
+ // No online segments in ideal state
+ continue;
+ }
+ nReplicasIdealMax =
+ (idealState.getInstanceStateMap(partitionName).size() > nReplicasIdealMax) ? idealState.getInstanceStateMap(
+ partitionName).size() : nReplicasIdealMax;
+ if ((externalView == null) || (externalView.getStateMap(partitionName) == null)) {
+ // No replicas for this segment
+ TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if ((tableType != null) && (tableType.equals(TableType.OFFLINE))) {
+ OfflineSegmentZKMetadata segmentZKMetadata =
+ ZKMetadataProvider.getOfflineSegmentZKMetadata(_propertyStore, tableNameWithType, partitionName);
+ if (segmentZKMetadata != null
+ && segmentZKMetadata.getPushTime() > System.currentTimeMillis() - _waitForPushTimeSeconds * 1000) {
+ // push not yet finished, skip
+ continue;
}
}
- if (nReplicas == 0) {
- if (nOffline < MaxOfflineSegmentsToLog) {
- LOGGER.warn("Segment {} of table {} has no online replicas", partitionName, tableName);
- }
- nOffline++;
+ nOffline++;
+ if (nOffline < MaxOfflineSegmentsToLog) {
+ LOGGER.warn("Segment {} of table {} has no replicas", partitionName, tableNameWithType);
}
- nReplicasExternal =
- ((nReplicasExternal > nReplicas) || (nReplicasExternal == -1)) ? nReplicas : nReplicasExternal;
- }
- if (nReplicasExternal == -1) {
- nReplicasExternal = (nReplicasIdealMax == 0) ? 1 : 0;
+ nReplicasExternal = 0;
+ continue;
}
- // Synchronization provided by Controller Gauge to make sure that only one thread updates the gauge
- _metricsRegistry.setValueOfTableGauge(tableName, ControllerGauge.NUMBER_OF_REPLICAS, nReplicasExternal);
- _metricsRegistry.setValueOfTableGauge(tableName, ControllerGauge.PERCENT_OF_REPLICAS,
- (nReplicasIdealMax > 0) ? (nReplicasExternal * 100 / nReplicasIdealMax) : 100);
- _metricsRegistry.setValueOfTableGauge(tableName, ControllerGauge.SEGMENTS_IN_ERROR_STATE, nErrors);
- _metricsRegistry.setValueOfTableGauge(tableName, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE,
- (nSegments > 0) ? (100 - (nOffline * 100 / nSegments)) : 100);
- if (nOffline > 0) {
- LOGGER.warn("Table {} has {} segments with no online replicas", tableName, nOffline);
+ for (Map.Entry<String, String> serverAndState : externalView.getStateMap(partitionName).entrySet()) {
+ // Count number of online replicas. Ignore if state is CONSUMING.
+ // It is possible for a segment to be ONLINE in idealstate, and CONSUMING in EV for a short period of time.
+ // So, ignore this combination. If a segment exists in this combination for a long time, we will get
+ // low level-partition-not-consuming alert anyway.
+ if (serverAndState.getValue().equals(ONLINE) || serverAndState.getValue().equals(CONSUMING)) {
+ nReplicas++;
+ }
+ if (serverAndState.getValue().equals(ERROR)) {
+ nErrors++;
+ }
}
- if (nReplicasExternal < nReplicasIdealMax) {
- LOGGER.warn("Table {} has {} replicas, below replication threshold :{}", tableName, nReplicasExternal,
- nReplicasIdealMax);
+ if (nReplicas == 0) {
+ if (nOffline < MaxOfflineSegmentsToLog) {
+ LOGGER.warn("Segment {} of table {} has no online replicas", partitionName, tableNameWithType);
+ }
+ nOffline++;
}
- } catch (Exception e) {
- LOGGER.warn("Caught exception while updating segment status for table {}", e, tableName);
-
- // Remove the metric for this table
- resetTableMetrics(tableName);
+ nReplicasExternal =
+ ((nReplicasExternal > nReplicas) || (nReplicasExternal == -1)) ? nReplicas : nReplicasExternal;
}
- }
+ if (nReplicasExternal == -1) {
+ nReplicasExternal = (nReplicasIdealMax == 0) ? 1 : 0;
+ }
+ // Synchronization provided by Controller Gauge to make sure that only one thread updates the gauge
+ _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, nReplicasExternal);
+ _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS,
+ (nReplicasIdealMax > 0) ? (nReplicasExternal * 100 / nReplicasIdealMax) : 100);
+ _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE, nErrors);
+ _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE,
+ (nSegments > 0) ? (100 - (nOffline * 100 / nSegments)) : 100);
+ if (nOffline > 0) {
+ LOGGER.warn("Table {} has {} segments with no online replicas", tableNameWithType, nOffline);
+ }
+ if (nReplicasExternal < nReplicasIdealMax) {
+ LOGGER.warn("Table {} has {} replicas, below replication threshold :{}", tableNameWithType, nReplicasExternal,
+ nReplicasIdealMax);
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while updating segment status for table {}", e, tableNameWithType);
- _metricsRegistry.setValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT, realTimeTableCount);
- _metricsRegistry.setValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT, offlineTableCount);
- _metricsRegistry.setValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT, disabledTableCount);
+ // Remove the metric for this table
+ resetTableMetrics(tableNameWithType);
+ }
}
private void setStatusToDefault() {
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
index 0ff695d..d1ebd96 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -49,6 +49,11 @@ public class PinotTaskManager extends ControllerPeriodicTask {
private final TaskGeneratorRegistry _taskGeneratorRegistry;
private final ControllerMetrics _controllerMetrics;
+ private Map<String, List<TableConfig>> _enabledTableConfigMap;
+ private Set<String> _taskTypes;
+ private int _numTaskTypes;
+ private Map<String, String> _tasksScheduled;
+
public PinotTaskManager(@Nonnull PinotHelixTaskResourceManager helixTaskResourceManager,
@Nonnull PinotHelixResourceManager helixResourceManager, @Nonnull ControllerConf controllerConf,
@Nonnull ControllerMetrics controllerMetrics) {
@@ -81,81 +86,81 @@ public class PinotTaskManager extends ControllerPeriodicTask {
}
/**
- * Check the Pinot cluster status and schedule new tasks for the given tables.
- *
- * @param tables List of table names
- * @return Map from task type to task scheduled
+ * Public API to schedule tasks. It doesn't matter whether current pinot controller is leader.
*/
- @Nonnull
- private Map<String, String> scheduleTasks(List<String> tables) {
+ public Map<String, String> scheduleTasks() {
+ process(_pinotHelixResourceManager.getAllTables());
+ return getTasksScheduled();
+ }
+
+ /**
+ * Performs necessary cleanups (e.g. remove metrics) when the controller leadership changes.
+ */
+ @Override
+ public void onBecomeNotLeader() {
+ LOGGER.info("Perform task cleanups.");
+ // Performs necessary cleanups for each task type.
+ for (String taskType : _taskGeneratorRegistry.getAllTaskTypes()) {
+ _taskGeneratorRegistry.getTaskGenerator(taskType).nonLeaderCleanUp();
+ }
+ }
+
+
+ @Override
+ protected void preprocess() {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L);
- Set<String> taskTypes = _taskGeneratorRegistry.getAllTaskTypes();
- int numTaskTypes = taskTypes.size();
- Map<String, List<TableConfig>> enabledTableConfigMap = new HashMap<>(numTaskTypes);
+ _taskTypes = _taskGeneratorRegistry.getAllTaskTypes();
+ _numTaskTypes = _taskTypes.size();
+ _enabledTableConfigMap = new HashMap<>(_numTaskTypes);
- for (String taskType : taskTypes) {
- enabledTableConfigMap.put(taskType, new ArrayList<>());
+ for (String taskType : _taskTypes) {
+ _enabledTableConfigMap.put(taskType, new ArrayList<>());
// Ensure all task queues exist
_helixTaskResourceManager.ensureTaskQueueExists(taskType);
}
+ }
- // Scan all table configs to get the tables with tasks enabled
- for (String tableName : tables) {
- TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableName);
- if (tableConfig != null) {
- TableTaskConfig taskConfig = tableConfig.getTaskConfig();
- if (taskConfig != null) {
- for (String taskType : taskTypes) {
- if (taskConfig.isTaskTypeEnabled(taskType)) {
- enabledTableConfigMap.get(taskType).add(tableConfig);
- }
+ @Override
+ protected void process(String tableNameWithType) {
+ TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig != null) {
+ TableTaskConfig taskConfig = tableConfig.getTaskConfig();
+ if (taskConfig != null) {
+ for (String taskType : _taskTypes) {
+ if (taskConfig.isTaskTypeEnabled(taskType)) {
+ _enabledTableConfigMap.get(taskType).add(tableConfig);
}
}
}
}
+ }
+ @Override
+ protected void postprocess() {
// Generate each type of tasks
- Map<String, String> tasksScheduled = new HashMap<>(numTaskTypes);
- for (String taskType : taskTypes) {
+ _tasksScheduled = new HashMap<>(_numTaskTypes);
+ for (String taskType : _taskTypes) {
LOGGER.info("Generating tasks for task type: {}", taskType);
PinotTaskGenerator pinotTaskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
- List<PinotTaskConfig> pinotTaskConfigs = pinotTaskGenerator.generateTasks(enabledTableConfigMap.get(taskType));
+ List<PinotTaskConfig> pinotTaskConfigs = pinotTaskGenerator.generateTasks(_enabledTableConfigMap.get(taskType));
int numTasks = pinotTaskConfigs.size();
if (numTasks > 0) {
LOGGER.info("Submitting {} tasks for task type: {} with task configs: {}", numTasks, taskType,
pinotTaskConfigs);
- tasksScheduled.put(taskType, _helixTaskResourceManager.submitTask(pinotTaskConfigs,
+ _tasksScheduled.put(taskType, _helixTaskResourceManager.submitTask(pinotTaskConfigs,
pinotTaskGenerator.getNumConcurrentTasksPerInstance()));
_controllerMetrics.addMeteredTableValue(taskType, ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
}
}
-
- return tasksScheduled;
}
/**
- * Public API to schedule tasks. It doesn't matter whether current pinot controller is leader.
- */
- public Map<String, String> scheduleTasks() {
- return scheduleTasks(_pinotHelixResourceManager.getAllTables());
- }
-
- /**
- * Performs necessary cleanups (e.g. remove metrics) when the controller leadership changes.
+ * Returns the tasks that have been scheduled as part of the postprocess
+ * @return
*/
- @Override
- public void onBecomeNotLeader() {
- LOGGER.info("Perform task cleanups.");
- // Performs necessary cleanups for each task type.
- for (String taskType : _taskGeneratorRegistry.getAllTaskTypes()) {
- _taskGeneratorRegistry.getTaskGenerator(taskType).nonLeaderCleanUp();
- }
- }
-
- @Override
- public void process(List<String> tables) {
- scheduleTasks(tables);
+ public Map<String, String> getTasksScheduled() {
+ return _tasksScheduled;
}
}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 3b12c0f..9ce5c59 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -110,7 +110,29 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask {
*
* @param tables List of table names
*/
- public abstract void process(List<String> tables);
+ protected void process(List<String> tables) {
+ preprocess();
+ for (String table : tables) {
+ process(table);
+ }
+ postprocess();
+ }
+
+ /**
+ * This method runs before processing all tables
+ */
+ protected abstract void preprocess();
+
+ /**
+ * Process the controller periodic task for the given table
+ * @param tableNameWithType
+ */
+ protected abstract void process(String tableNameWithType);
+
+ /**
+ * This method runs after processing all tables
+ */
+ protected abstract void postprocess();
@VisibleForTesting
protected boolean isLeader() {
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index d152887..01201fa 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -58,51 +58,59 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
}
@Override
- public void process(List<String> tables) {
- runRelocation(tables);
+ protected void preprocess() {
+
+ }
+
+ @Override
+ protected void process(String tableNameWithType) {
+ runRelocation(tableNameWithType);
+ }
+
+ @Override
+ protected void postprocess() {
+
}
/**
- * Check the given tables. Perform relocation of segments if table is realtime and relocation is required
+ * Check the given table. Perform relocation of segments if table is realtime and relocation is required
* TODO: Model this to implement {@link com.linkedin.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategy} interface
* https://github.com/linkedin/pinot/issues/2609
*
- * @param tables List of table names
+ * @param tableNameWithType
*/
- private void runRelocation(List<String> tables) {
- for (String tableNameWithType : tables) {
- // Only consider realtime tables.
- if (!TableNameBuilder.REALTIME.tableHasTypeSuffix(tableNameWithType)) {
- continue;
+ private void runRelocation(String tableNameWithType) {
+ // Only consider realtime tables.
+ if (!TableNameBuilder.REALTIME.tableHasTypeSuffix(tableNameWithType)) {
+ return;
+ }
+ try {
+ LOGGER.info("Starting relocation of segments for table: {}", tableNameWithType);
+
+ TableConfig tableConfig = _pinotHelixResourceManager.getRealtimeTableConfig(tableNameWithType);
+ final RealtimeTagConfig realtimeTagConfig = new RealtimeTagConfig(tableConfig);
+ if (!realtimeTagConfig.isRelocateCompletedSegments()) {
+ LOGGER.info("Skipping relocation of segments for {}", tableNameWithType);
+ return;
}
- try {
- LOGGER.info("Starting relocation of segments for table: {}", tableNameWithType);
-
- TableConfig tableConfig = _pinotHelixResourceManager.getRealtimeTableConfig(tableNameWithType);
- final RealtimeTagConfig realtimeTagConfig = new RealtimeTagConfig(tableConfig);
- if (!realtimeTagConfig.isRelocateCompletedSegments()) {
- LOGGER.info("Skipping relocation of segments for {}", tableNameWithType);
- continue;
- }
- Function<IdealState, IdealState> updater = new Function<IdealState, IdealState>() {
- @Nullable
- @Override
- public IdealState apply(@Nullable IdealState idealState) {
- if (!idealState.isEnabled()) {
- LOGGER.info("Skipping relocation of segments for {} since ideal state is disabled", tableNameWithType);
- return null;
- }
- relocateSegments(realtimeTagConfig, idealState);
- return idealState;
+ Function<IdealState, IdealState> updater = new Function<IdealState, IdealState>() {
+ @Nullable
+ @Override
+ public IdealState apply(@Nullable IdealState idealState) {
+ if (!idealState.isEnabled()) {
+ LOGGER.info("Skipping relocation of segments for {} since ideal state is disabled", tableNameWithType);
+ return null;
}
- };
+ relocateSegments(realtimeTagConfig, idealState);
+ return idealState;
+ }
+ };
- HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(), tableNameWithType, updater,
- RetryPolicies.exponentialBackoffRetryPolicy(5, 1000, 2.0f));
- } catch (Exception e) {
- LOGGER.error("Exception in relocating realtime segments of table {}", tableNameWithType, e);
- }
+ HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(), tableNameWithType, updater,
+ RetryPolicies.exponentialBackoffRetryPolicy(5, 1000, 2.0f));
+ } catch (Exception e) {
+ LOGGER.error("Exception in relocating realtime segments of table {}", tableNameWithType, e);
}
}
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
index d76cc3b..c0b3e79 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/retention/RetentionManager.java
@@ -62,29 +62,26 @@ public class RetentionManager extends ControllerPeriodicTask {
}
@Override
- public void process(List<String> tables) {
- execute(tables);
+ protected void preprocess() {
+
}
- /**
- * Manages retention for the given tables.
- *
- * @param tables List of table names
- */
- private void execute(List<String> tables) {
+ @Override
+ protected void process(String tableNameWithType) {
try {
- for (String tableNameWithType : tables) {
- LOGGER.info("Start managing retention for table: {}", tableNameWithType);
- manageRetentionForTable(tableNameWithType);
- }
-
- LOGGER.info("Removing aged (more than {} days) deleted segments for all tables", _deletedSegmentsRetentionInDays);
- _pinotHelixResourceManager.getSegmentDeletionManager().removeAgedDeletedSegments(_deletedSegmentsRetentionInDays);
+ LOGGER.info("Start managing retention for table: {}", tableNameWithType);
+ manageRetentionForTable(tableNameWithType);
} catch (Exception e) {
LOGGER.error("Caught exception while managing retention for all tables", e);
}
}
+ @Override
+ protected void postprocess() {
+ LOGGER.info("Removing aged (more than {} days) deleted segments for all tables", _deletedSegmentsRetentionInDays);
+ _pinotHelixResourceManager.getSegmentDeletionManager().removeAgedDeletedSegments(_deletedSegmentsRetentionInDays);
+ }
+
private void manageRetentionForTable(String tableNameWithType) {
try {
// Build retention strategy from table config
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
index 8717318..b9b775d 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/validation/ValidationManager.java
@@ -23,7 +23,7 @@ import com.linkedin.pinot.common.config.TableNameBuilder;
import com.linkedin.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import com.linkedin.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
import com.linkedin.pinot.common.metrics.ValidationMetrics;
-import com.linkedin.pinot.common.utils.CommonConstants.Helix.TableType;
+import com.linkedin.pinot.common.utils.CommonConstants;
import com.linkedin.pinot.common.utils.HLCSegmentName;
import com.linkedin.pinot.common.utils.SegmentName;
import com.linkedin.pinot.common.utils.time.TimeUtils;
@@ -57,6 +57,8 @@ public class ValidationManager extends ControllerPeriodicTask {
private final ValidationMetrics _validationMetrics;
private long _lastSegmentLevelValidationTimeMs = 0L;
+ private boolean _runSegmentLevelValidation;
+ private List<InstanceConfig> _instanceConfigs;
public ValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, ValidationMetrics validationMetrics) {
@@ -74,61 +76,62 @@ public class ValidationManager extends ControllerPeriodicTask {
}
@Override
- public void process(List<String> tables) {
- runValidation(tables);
- }
-
- /**
- * Runs a validation pass over the given tables.
- *
- * @param tables List of table names
- */
- private void runValidation(List<String> tables) {
+ public void preprocess() {
// Run segment level validation using a separate interval
- boolean runSegmentLevelValidation = false;
+ _runSegmentLevelValidation = false;
long currentTimeMs = System.currentTimeMillis();
if (TimeUnit.MILLISECONDS.toSeconds(currentTimeMs - _lastSegmentLevelValidationTimeMs)
>= _segmentLevelValidationIntervalInSeconds) {
LOGGER.info("Run segment-level validation");
- runSegmentLevelValidation = true;
+ _runSegmentLevelValidation = true;
_lastSegmentLevelValidationTimeMs = currentTimeMs;
}
// Cache instance configs to reduce ZK access
- List<InstanceConfig> instanceConfigs = _pinotHelixResourceManager.getAllHelixInstanceConfigs();
-
- for (String tableNameWithType : tables) {
- try {
- TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
- if (tableConfig == null) {
- LOGGER.warn("Failed to find table config for table: {}, skipping validation", tableNameWithType);
- continue;
- }
+ _instanceConfigs = _pinotHelixResourceManager.getAllHelixInstanceConfigs();
+ }
- // Rebuild broker resource
- Set<String> brokerInstances = _pinotHelixResourceManager.getAllInstancesForBrokerTenant(instanceConfigs,
- tableConfig.getTenantConfig().getBroker());
- _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType, brokerInstances);
+ @Override
+ public void process(String tableNameWithType) {
+ runValidation(tableNameWithType);
+ }
- // Perform validation based on the table type
- TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
- if (tableType == TableType.OFFLINE) {
- if (runSegmentLevelValidation) {
- validateOfflineSegmentPush(tableConfig);
- }
- } else {
- if (runSegmentLevelValidation) {
- updateRealtimeDocumentCount(tableConfig);
- }
- Map<String, String> streamConfigMap = tableConfig.getIndexingConfig().getStreamConfigs();
- StreamConfig streamConfig = new StreamConfig(streamConfigMap);
- if (streamConfig.hasLowLevelConsumerType()) {
- _llcRealtimeSegmentManager.validateLLCSegments(tableConfig);
- }
+ @Override
+ public void postprocess() {
+
+ }
+
+ private void runValidation(String tableNameWithType) {
+ try {
+ TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}, skipping validation", tableNameWithType);
+ return;
+ }
+
+ // Rebuild broker resource
+ Set<String> brokerInstances = _pinotHelixResourceManager.getAllInstancesForBrokerTenant(_instanceConfigs,
+ tableConfig.getTenantConfig().getBroker());
+ _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType, brokerInstances);
+
+ // Perform validation based on the table type
+ CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType == CommonConstants.Helix.TableType.OFFLINE) {
+ if (_runSegmentLevelValidation) {
+ validateOfflineSegmentPush(tableConfig);
+ }
+ } else {
+ if (_runSegmentLevelValidation) {
+ updateRealtimeDocumentCount(tableConfig);
+ }
+ Map<String, String> streamConfigMap = tableConfig.getIndexingConfig().getStreamConfigs();
+ StreamConfig streamConfig = new StreamConfig(streamConfigMap);
+ if (streamConfig.hasLowLevelConsumerType()) {
+ _llcRealtimeSegmentManager.validateLLCSegments(tableConfig);
}
- } catch (Exception e) {
- LOGGER.warn("Caught exception while validating table: {}", tableNameWithType, e);
}
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while validating table: {}", tableNameWithType, e);
}
}
diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index 9a5d26c..1ada214 100644
--- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -115,7 +115,22 @@ public class ControllerPeriodicTaskTest {
}
@Override
- public void process(List<String> tables) {
+ protected void process(List<String> tables) {
+
+ }
+
+ @Override
+ protected void preprocess() {
+
+ }
+
+ @Override
+ protected void process(String tableNameWithType) {
+
+ }
+
+ @Override
+ public void postprocess() {
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org