You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by mc...@apache.org on 2019/01/28 21:37:26 UTC
[incubator-pinot] branch master updated: Fix
SegmentStatusCheckerIntegrationTest setup timings (#3749)
This is an automated email from the ASF dual-hosted git repository.
mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9004e38 Fix SegmentStatusCheckerIntegrationTest setup timings (#3749)
9004e38 is described below
commit 9004e3836651e650b13bda4e439b2f5fba6c5054
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Mon Jan 28 13:37:20 2019 -0800
Fix SegmentStatusCheckerIntegrationTest setup timings (#3749)
* Fix SegmentStatusCheckerIntegrationTest setup timings
* Keep only 1 ControllerGauge value and use suffix for task name
* Some more refactoring to keep counting of numTablesProcessed in base class of periodic tasks
---
.../pinot/common/metrics/AbstractMetrics.java | 55 +++--
.../pinot/common/metrics/ControllerGauge.java | 2 +
.../apache/pinot/controller/ControllerStarter.java | 14 +-
.../controller/helix/SegmentStatusChecker.java | 237 ++++++++++-----------
.../helix/core/minion/PinotTaskManager.java | 13 +-
.../core/periodictask/ControllerPeriodicTask.java | 23 +-
.../core/relocation/RealtimeSegmentRelocator.java | 63 +++---
.../helix/core/retention/RetentionManager.java | 77 ++++---
.../BrokerResourceValidationManager.java | 32 +--
.../validation/OfflineSegmentIntervalChecker.java | 29 ++-
.../RealtimeSegmentValidationManager.java | 44 ++--
.../periodictask/ControllerPeriodicTaskTest.java | 42 ++--
.../relocation/RealtimeSegmentRelocatorTest.java | 11 +-
.../helix/core/retention/RetentionManagerTest.java | 8 +-
.../tasks/SegmentStatusCheckerIntegrationTest.java | 41 +++-
15 files changed, 389 insertions(+), 302 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
index 6f78211..7857f57 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
@@ -327,23 +327,22 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e
String gaugeName = gauge.getGaugeName();
fullGaugeName = gaugeName + "." + getTableName(tableName);
- if (!_gaugeValues.containsKey(fullGaugeName)) {
- synchronized (_gaugeValues) {
- if(!_gaugeValues.containsKey(fullGaugeName)) {
- _gaugeValues.put(fullGaugeName, new AtomicLong(value));
- addCallbackGauge(fullGaugeName, new Callable<Long>() {
- @Override
- public Long call() throws Exception {
- return _gaugeValues.get(fullGaugeName).get();
- }
- });
- } else {
- _gaugeValues.get(fullGaugeName).set(value);
- }
- }
- } else {
- _gaugeValues.get(fullGaugeName).set(value);
- }
+ setValueOfGauge(value, fullGaugeName);
+ }
+
+ /**
+ * Sets the value of a custom global gauge.
+ *
+ * @param suffix The suffix to attach to the gauge name
+ * @param gauge The gauge to use
+ * @param value The value to set the gauge to
+ */
+ public void setValueOfGlobalGauge(final G gauge, final String suffix, final long value) {
+ final String fullGaugeName;
+ String gaugeName = gauge.getGaugeName();
+ fullGaugeName = gaugeName + "." + suffix;
+
+ setValueOfGauge(value, fullGaugeName);
}
/**
@@ -355,16 +354,15 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e
public void setValueOfGlobalGauge(final G gauge, final long value) {
final String gaugeName = gauge.getGaugeName();
+ setValueOfGauge(value, gaugeName);
+ }
+
+ private void setValueOfGauge(long value, String gaugeName) {
if (!_gaugeValues.containsKey(gaugeName)) {
synchronized (_gaugeValues) {
if(!_gaugeValues.containsKey(gaugeName)) {
_gaugeValues.put(gaugeName, new AtomicLong(value));
- addCallbackGauge(gaugeName, new Callable<Long>() {
- @Override
- public Long call() throws Exception {
- return _gaugeValues.get(gaugeName).get();
- }
- });
+ addCallbackGauge(gaugeName, () -> _gaugeValues.get(gaugeName).get());
} else {
_gaugeValues.get(gaugeName).set(value);
}
@@ -412,6 +410,17 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e
}
}
+
+ @VisibleForTesting
+ public long getValueOfGlobalGauge(final G gauge, String suffix) {
+ String fullGaugeName = gauge.getGaugeName() + "." + suffix;
+ if (!_gaugeValues.containsKey(fullGaugeName)) {
+ return 0;
+ } else {
+ return _gaugeValues.get(fullGaugeName).get();
+ }
+ }
+
/**
* Gets the value of a table gauge.
*
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 38ba4df..0c18040 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -38,6 +38,8 @@ public enum ControllerGauge implements AbstractMetrics.Gauge {
OFFLINE_TABLE_COUNT("TableCount", true),
DISABLED_TABLE_COUNT("TableCount", true),
+ PERIODIC_TASK_NUM_TABLES_PROCESSED("PeriodicTaskNumTablesProcessed", true),
+
SHORT_OF_LIVE_INSTANCES("ShortOfLiveInstances", false), // Number of extra live instances needed.
REALTIME_TABLE_ESTIMATED_SIZE("RealtimeTableEstimatedSize", false), // Estimated size of realtime table.
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index ab0a7ad..6b0efc8 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -300,21 +300,21 @@ public class ControllerStarter {
List<PeriodicTask> periodicTasks = new ArrayList<>();
_taskManager = new PinotTaskManager(_helixTaskResourceManager, _helixResourceManager, _config, _controllerMetrics);
periodicTasks.add(_taskManager);
- _retentionManager = new RetentionManager(_helixResourceManager, _config);
+ _retentionManager = new RetentionManager(_helixResourceManager, _config, _controllerMetrics);
periodicTasks.add(_retentionManager);
_offlineSegmentIntervalChecker =
- new OfflineSegmentIntervalChecker(_config, _helixResourceManager, new ValidationMetrics(_metricsRegistry));
+ new OfflineSegmentIntervalChecker(_config, _helixResourceManager, new ValidationMetrics(_metricsRegistry),
+ _controllerMetrics);
periodicTasks.add(_offlineSegmentIntervalChecker);
- _realtimeSegmentValidationManager =
- new RealtimeSegmentValidationManager(_config, _helixResourceManager, PinotLLCRealtimeSegmentManager.getInstance(),
- new ValidationMetrics(_metricsRegistry));
+ _realtimeSegmentValidationManager = new RealtimeSegmentValidationManager(_config, _helixResourceManager,
+ PinotLLCRealtimeSegmentManager.getInstance(), new ValidationMetrics(_metricsRegistry), _controllerMetrics);
periodicTasks.add(_realtimeSegmentValidationManager);
_brokerResourceValidationManager =
- new BrokerResourceValidationManager(_config, _helixResourceManager);
+ new BrokerResourceValidationManager(_config, _helixResourceManager, _controllerMetrics);
periodicTasks.add(_brokerResourceValidationManager);
_segmentStatusChecker = new SegmentStatusChecker(_helixResourceManager, _config, _controllerMetrics);
periodicTasks.add(_segmentStatusChecker);
- _realtimeSegmentRelocator = new RealtimeSegmentRelocator(_helixResourceManager, _config);
+ _realtimeSegmentRelocator = new RealtimeSegmentRelocator(_helixResourceManager, _config, _controllerMetrics);
periodicTasks.add(_realtimeSegmentRelocator);
return periodicTasks;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index 3cb1ee4..84bf706 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -45,7 +45,6 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
public static final String ONLINE = "ONLINE";
public static final String ERROR = "ERROR";
public static final String CONSUMING = "CONSUMING";
- private final ControllerMetrics _metricsRegistry;
private final int _waitForPushTimeSeconds;
// log messages about disabled tables atmost once a day
@@ -63,12 +62,11 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
* @param config The controller configuration object
*/
public SegmentStatusChecker(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
- ControllerMetrics metricsRegistry) {
+ ControllerMetrics controllerMetrics) {
super("SegmentStatusChecker", config.getStatusCheckerFrequencyInSeconds(),
- config.getStatusCheckerInitialDelayInSeconds(), pinotHelixResourceManager);
+ config.getStatusCheckerInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
_waitForPushTimeSeconds = config.getStatusCheckerWaitForPushTimeInSeconds();
- _metricsRegistry = metricsRegistry;
}
@Override
@@ -99,6 +97,13 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
}
@Override
+ protected void exceptionHandler(String tableNameWithType, Exception e) {
+ LOGGER.error("Caught exception while updating segment status for table {}", tableNameWithType, e);
+ // Remove the metric for this table
+ resetTableMetrics(tableNameWithType);
+ }
+
+ @Override
protected void postprocess() {
_metricsRegistry.setValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT, _realTimeTableCount);
_metricsRegistry.setValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT, _offlineTableCount);
@@ -113,138 +118,132 @@ public class SegmentStatusChecker extends ControllerPeriodicTask {
*/
private void updateSegmentMetrics(String tableNameWithType) {
- try {
- if (TableNameBuilder.getTableTypeFromTableName(tableNameWithType) == TableType.OFFLINE) {
- _offlineTableCount++;
- } else {
- _realTimeTableCount++;
- }
+ if (TableNameBuilder.getTableTypeFromTableName(tableNameWithType) == TableType.OFFLINE) {
+ _offlineTableCount++;
+ } else {
+ _realTimeTableCount++;
+ }
- IdealState idealState = _pinotHelixResourceManager.getTableIdealState(tableNameWithType);
+ IdealState idealState = _pinotHelixResourceManager.getTableIdealState(tableNameWithType);
- if (idealState == null) {
- LOGGER.warn("Table {} has null ideal state. Skipping segment status checks", tableNameWithType);
- resetTableMetrics(tableNameWithType);
- return;
- }
+ if (idealState == null) {
+ LOGGER.warn("Table {} has null ideal state. Skipping segment status checks", tableNameWithType);
+ resetTableMetrics(tableNameWithType);
+ return;
+ }
- if (!idealState.isEnabled()) {
- if (_logDisabledTables) {
- LOGGER.warn("Table {} is disabled. Skipping segment status checks", tableNameWithType);
- }
- resetTableMetrics(tableNameWithType);
- _disabledTableCount++;
- return;
+ if (!idealState.isEnabled()) {
+ if (_logDisabledTables) {
+ LOGGER.warn("Table {} is disabled. Skipping segment status checks", tableNameWithType);
}
+ resetTableMetrics(tableNameWithType);
+ _disabledTableCount++;
+ return;
+ }
- if (idealState.getPartitionSet().isEmpty()) {
- int nReplicasFromIdealState = 1;
- try {
- nReplicasFromIdealState = Integer.valueOf(idealState.getReplicas());
- } catch (NumberFormatException e) {
- // Ignore
- }
- _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, nReplicasFromIdealState);
- _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS, 100);
- _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, 100);
- return;
+ if (idealState.getPartitionSet().isEmpty()) {
+ int nReplicasFromIdealState = 1;
+ try {
+ nReplicasFromIdealState = Integer.valueOf(idealState.getReplicas());
+ } catch (NumberFormatException e) {
+ // Ignore
}
+ _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS,
+ nReplicasFromIdealState);
+ _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS, 100);
+ _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE, 100);
+ return;
+ }
- _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.IDEALSTATE_ZNODE_SIZE,
- idealState.toString().length());
- _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENT_COUNT,
- (long) (idealState.getPartitionSet().size()));
- ExternalView externalView = _pinotHelixResourceManager.getTableExternalView(tableNameWithType);
+ _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.IDEALSTATE_ZNODE_SIZE,
+ idealState.toString().length());
+ _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENT_COUNT,
+ (long) (idealState.getPartitionSet().size()));
+ ExternalView externalView = _pinotHelixResourceManager.getTableExternalView(tableNameWithType);
- int nReplicasIdealMax = 0; // Keeps track of maximum number of replicas in ideal state
- int nReplicasExternal = -1; // Keeps track of minimum number of replicas in external view
- int nErrors = 0; // Keeps track of number of segments in error state
- int nOffline = 0; // Keeps track of number segments with no online replicas
- int nSegments = 0; // Counts number of segments
- for (String partitionName : idealState.getPartitionSet()) {
- int nReplicas = 0;
- int nIdeal = 0;
- nSegments++;
- // Skip segments not online in ideal state
- for (Map.Entry<String, String> serverAndState : idealState.getInstanceStateMap(partitionName).entrySet()) {
- if (serverAndState == null) {
- break;
- }
- if (serverAndState.getValue().equals(ONLINE)) {
- nIdeal++;
- break;
- }
+ int nReplicasIdealMax = 0; // Keeps track of maximum number of replicas in ideal state
+ int nReplicasExternal = -1; // Keeps track of minimum number of replicas in external view
+ int nErrors = 0; // Keeps track of number of segments in error state
+ int nOffline = 0; // Keeps track of number segments with no online replicas
+ int nSegments = 0; // Counts number of segments
+ for (String partitionName : idealState.getPartitionSet()) {
+ int nReplicas = 0;
+ int nIdeal = 0;
+ nSegments++;
+ // Skip segments not online in ideal state
+ for (Map.Entry<String, String> serverAndState : idealState.getInstanceStateMap(partitionName).entrySet()) {
+ if (serverAndState == null) {
+ break;
}
- if (nIdeal == 0) {
- // No online segments in ideal state
- continue;
+ if (serverAndState.getValue().equals(ONLINE)) {
+ nIdeal++;
+ break;
}
- nReplicasIdealMax =
- (idealState.getInstanceStateMap(partitionName).size() > nReplicasIdealMax) ? idealState.getInstanceStateMap(
- partitionName).size() : nReplicasIdealMax;
- if ((externalView == null) || (externalView.getStateMap(partitionName) == null)) {
- // No replicas for this segment
- TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
- if ((tableType != null) && (tableType.equals(TableType.OFFLINE))) {
- OfflineSegmentZKMetadata segmentZKMetadata =
- _pinotHelixResourceManager.getOfflineSegmentZKMetadata(tableNameWithType, partitionName);
+ }
+ if (nIdeal == 0) {
+ // No online segments in ideal state
+ continue;
+ }
+ nReplicasIdealMax =
+ (idealState.getInstanceStateMap(partitionName).size() > nReplicasIdealMax) ? idealState.getInstanceStateMap(
+ partitionName).size() : nReplicasIdealMax;
+ if ((externalView == null) || (externalView.getStateMap(partitionName) == null)) {
+ // No replicas for this segment
+ TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if ((tableType != null) && (tableType.equals(TableType.OFFLINE))) {
+ OfflineSegmentZKMetadata segmentZKMetadata =
+ _pinotHelixResourceManager.getOfflineSegmentZKMetadata(tableNameWithType, partitionName);
- if (segmentZKMetadata != null
- && segmentZKMetadata.getPushTime() > System.currentTimeMillis() - _waitForPushTimeSeconds * 1000) {
- // push not yet finished, skip
- continue;
- }
- }
- nOffline++;
- if (nOffline < MaxOfflineSegmentsToLog) {
- LOGGER.warn("Segment {} of table {} has no replicas", partitionName, tableNameWithType);
- }
- nReplicasExternal = 0;
- continue;
- }
- for (Map.Entry<String, String> serverAndState : externalView.getStateMap(partitionName).entrySet()) {
- // Count number of online replicas. Ignore if state is CONSUMING.
- // It is possible for a segment to be ONLINE in idealstate, and CONSUMING in EV for a short period of time.
- // So, ignore this combination. If a segment exists in this combination for a long time, we will get
- // low level-partition-not-consuming alert anyway.
- if (serverAndState.getValue().equals(ONLINE) || serverAndState.getValue().equals(CONSUMING)) {
- nReplicas++;
- }
- if (serverAndState.getValue().equals(ERROR)) {
- nErrors++;
+ if (segmentZKMetadata != null
+ && segmentZKMetadata.getPushTime() > System.currentTimeMillis() - _waitForPushTimeSeconds * 1000) {
+ // push not yet finished, skip
+ continue;
}
}
- if (nReplicas == 0) {
- if (nOffline < MaxOfflineSegmentsToLog) {
- LOGGER.warn("Segment {} of table {} has no online replicas", partitionName, tableNameWithType);
- }
- nOffline++;
+ nOffline++;
+ if (nOffline < MaxOfflineSegmentsToLog) {
+ LOGGER.warn("Segment {} of table {} has no replicas", partitionName, tableNameWithType);
}
- nReplicasExternal =
- ((nReplicasExternal > nReplicas) || (nReplicasExternal == -1)) ? nReplicas : nReplicasExternal;
- }
- if (nReplicasExternal == -1) {
- nReplicasExternal = (nReplicasIdealMax == 0) ? 1 : 0;
+ nReplicasExternal = 0;
+ continue;
}
- // Synchronization provided by Controller Gauge to make sure that only one thread updates the gauge
- _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, nReplicasExternal);
- _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS,
- (nReplicasIdealMax > 0) ? (nReplicasExternal * 100 / nReplicasIdealMax) : 100);
- _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE, nErrors);
- _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE,
- (nSegments > 0) ? (100 - (nOffline * 100 / nSegments)) : 100);
- if (nOffline > 0) {
- LOGGER.warn("Table {} has {} segments with no online replicas", tableNameWithType, nOffline);
+ for (Map.Entry<String, String> serverAndState : externalView.getStateMap(partitionName).entrySet()) {
+ // Count number of online replicas. Ignore if state is CONSUMING.
+ // It is possible for a segment to be ONLINE in idealstate, and CONSUMING in EV for a short period of time.
+ // So, ignore this combination. If a segment exists in this combination for a long time, we will get
+ // low level-partition-not-consuming alert anyway.
+ if (serverAndState.getValue().equals(ONLINE) || serverAndState.getValue().equals(CONSUMING)) {
+ nReplicas++;
+ }
+ if (serverAndState.getValue().equals(ERROR)) {
+ nErrors++;
+ }
}
- if (nReplicasExternal < nReplicasIdealMax) {
- LOGGER.warn("Table {} has {} replicas, below replication threshold :{}", tableNameWithType, nReplicasExternal,
- nReplicasIdealMax);
+ if (nReplicas == 0) {
+ if (nOffline < MaxOfflineSegmentsToLog) {
+ LOGGER.warn("Segment {} of table {} has no online replicas", partitionName, tableNameWithType);
+ }
+ nOffline++;
}
- } catch (Exception e) {
- LOGGER.error("Caught exception while updating segment status for table {}", tableNameWithType, e);
-
- // Remove the metric for this table
- resetTableMetrics(tableNameWithType);
+ nReplicasExternal =
+ ((nReplicasExternal > nReplicas) || (nReplicasExternal == -1)) ? nReplicas : nReplicasExternal;
+ }
+ if (nReplicasExternal == -1) {
+ nReplicasExternal = (nReplicasIdealMax == 0) ? 1 : 0;
+ }
+ // Synchronization provided by Controller Gauge to make sure that only one thread updates the gauge
+ _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.NUMBER_OF_REPLICAS, nReplicasExternal);
+ _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_OF_REPLICAS,
+ (nReplicasIdealMax > 0) ? (nReplicasExternal * 100 / nReplicasIdealMax) : 100);
+ _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.SEGMENTS_IN_ERROR_STATE, nErrors);
+ _metricsRegistry.setValueOfTableGauge(tableNameWithType, ControllerGauge.PERCENT_SEGMENTS_AVAILABLE,
+ (nSegments > 0) ? (100 - (nOffline * 100 / nSegments)) : 100);
+ if (nOffline > 0) {
+ LOGGER.warn("Table {} has {} segments with no online replicas", tableNameWithType, nOffline);
+ }
+ if (nReplicasExternal < nReplicasIdealMax) {
+ LOGGER.warn("Table {} has {} replicas, below replication threshold :{}", tableNameWithType, nReplicasExternal,
+ nReplicasIdealMax);
}
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 2052f77..ed3bad6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -50,7 +50,6 @@ public class PinotTaskManager extends ControllerPeriodicTask {
private final PinotHelixTaskResourceManager _helixTaskResourceManager;
private final ClusterInfoProvider _clusterInfoProvider;
private final TaskGeneratorRegistry _taskGeneratorRegistry;
- private final ControllerMetrics _controllerMetrics;
private Map<String, List<TableConfig>> _enabledTableConfigMap;
private Set<String> _taskTypes;
@@ -61,11 +60,10 @@ public class PinotTaskManager extends ControllerPeriodicTask {
@Nonnull PinotHelixResourceManager helixResourceManager, @Nonnull ControllerConf controllerConf,
@Nonnull ControllerMetrics controllerMetrics) {
super("PinotTaskManager", controllerConf.getTaskManagerFrequencyInSeconds(),
- controllerConf.getPeriodicTaskInitialDelayInSeconds(), helixResourceManager);
+ controllerConf.getPeriodicTaskInitialDelayInSeconds(), helixResourceManager, controllerMetrics);
_helixTaskResourceManager = helixTaskResourceManager;
_clusterInfoProvider = new ClusterInfoProvider(helixResourceManager, helixTaskResourceManager, controllerConf);
_taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoProvider);
- _controllerMetrics = controllerMetrics;
}
@Override
@@ -104,7 +102,7 @@ public class PinotTaskManager extends ControllerPeriodicTask {
@Override
protected void preprocess() {
- _controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L);
+ _metricsRegistry.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L);
_taskTypes = _taskGeneratorRegistry.getAllTaskTypes();
_numTaskTypes = _taskTypes.size();
@@ -134,6 +132,11 @@ public class PinotTaskManager extends ControllerPeriodicTask {
}
@Override
+ protected void exceptionHandler(String tableNameWithType, Exception e) {
+ LOGGER.error("Exception in PinotTaskManager for table {}", tableNameWithType, e);
+ }
+
+ @Override
protected void postprocess() {
// Generate each type of tasks
_tasksScheduled = new HashMap<>(_numTaskTypes);
@@ -147,7 +150,7 @@ public class PinotTaskManager extends ControllerPeriodicTask {
pinotTaskConfigs);
_tasksScheduled.put(taskType, _helixTaskResourceManager.submitTask(pinotTaskConfigs,
pinotTaskGenerator.getNumConcurrentTasksPerInstance()));
- _controllerMetrics.addMeteredTableValue(taskType, ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
+ _metricsRegistry.addMeteredTableValue(taskType, ControllerMeter.NUMBER_TASKS_SUBMITTED, numTasks);
}
}
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
index 40cd982..e7ddbb3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTask.java
@@ -20,6 +20,8 @@ package org.apache.pinot.controller.helix.core.periodictask;
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.core.periodictask.BasePeriodicTask;
import org.slf4j.Logger;
@@ -33,18 +35,19 @@ import org.slf4j.LoggerFactory;
public abstract class ControllerPeriodicTask extends BasePeriodicTask {
private static final Logger LOGGER = LoggerFactory.getLogger(ControllerPeriodicTask.class);
-
private static final long MAX_CONTROLLER_PERIODIC_TASK_STOP_TIME_MILLIS = 30_000L;
protected final PinotHelixResourceManager _pinotHelixResourceManager;
+ protected final ControllerMetrics _metricsRegistry;
private volatile boolean _stopPeriodicTask;
private volatile boolean _periodicTaskInProgress;
public ControllerPeriodicTask(String taskName, long runFrequencyInSeconds, long initialDelayInSeconds,
- PinotHelixResourceManager pinotHelixResourceManager) {
+ PinotHelixResourceManager pinotHelixResourceManager, ControllerMetrics controllerMetrics) {
super(taskName, runFrequencyInSeconds, initialDelayInSeconds);
_pinotHelixResourceManager = pinotHelixResourceManager;
+ _metricsRegistry = controllerMetrics;
}
/**
@@ -118,16 +121,28 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask {
*/
protected void process(List<String> tableNamesWithType) {
if (!shouldStopPeriodicTask()) {
+
+ int numTablesProcessed = 0;
preprocess();
+
for (String tableNameWithType : tableNamesWithType) {
if (shouldStopPeriodicTask()) {
LOGGER.info("Skip processing table {} and all the remaining tables for task {}.", tableNameWithType,
getTaskName());
break;
}
- processTable(tableNameWithType);
+ try {
+ processTable(tableNameWithType);
+ numTablesProcessed++;
+ } catch (Exception e) {
+ exceptionHandler(tableNameWithType, e);
+ }
}
+
postprocess();
+ _metricsRegistry.setValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, getTaskName(),
+ numTablesProcessed);
+
} else {
LOGGER.info("Skip processing all tables for task {}", getTaskName());
}
@@ -149,6 +164,8 @@ public abstract class ControllerPeriodicTask extends BasePeriodicTask {
*/
protected abstract void postprocess();
+ protected abstract void exceptionHandler(String tableNameWithType, Exception e);
+
@VisibleForTesting
protected boolean shouldStopPeriodicTask() {
return _stopPeriodicTask;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index a5a81de..5cbcb48 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -33,6 +33,8 @@ import org.apache.helix.model.IdealState;
import org.apache.pinot.common.config.RealtimeTagConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableNameBuilder;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.common.utils.retry.RetryPolicies;
import org.apache.pinot.common.utils.time.TimeUtils;
@@ -54,9 +56,10 @@ import org.slf4j.LoggerFactory;
public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeSegmentRelocator.class);
- public RealtimeSegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config) {
+ public RealtimeSegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
+ ControllerMetrics controllerMetrics) {
super("RealtimeSegmentRelocator", getRunFrequencySeconds(config.getRealtimeSegmentRelocatorFrequency()),
- config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager);
+ config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
}
@Override
@@ -66,17 +69,23 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
@Override
protected void preprocess() {
-
}
@Override
protected void processTable(String tableNameWithType) {
- runRelocation(tableNameWithType);
+ CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType == CommonConstants.Helix.TableType.REALTIME) {
+ runRelocation(tableNameWithType);
+ }
}
@Override
protected void postprocess() {
+ }
+ @Override
+ protected void exceptionHandler(String tableNameWithType, Exception e) {
+ LOGGER.error("Exception in relocating realtime segments of table {}", tableNameWithType, e);
}
/**
@@ -87,38 +96,30 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
* @param tableNameWithType
*/
private void runRelocation(String tableNameWithType) {
- // Only consider realtime tables.
- if (!TableNameBuilder.REALTIME.tableHasTypeSuffix(tableNameWithType)) {
+ LOGGER.info("Starting relocation of segments for table: {}", tableNameWithType);
+
+ TableConfig tableConfig = _pinotHelixResourceManager.getRealtimeTableConfig(tableNameWithType);
+ final RealtimeTagConfig realtimeTagConfig = new RealtimeTagConfig(tableConfig);
+ if (!realtimeTagConfig.isRelocateCompletedSegments()) {
+ LOGGER.info("Skipping relocation of segments for {}", tableNameWithType);
return;
}
- try {
- LOGGER.info("Starting relocation of segments for table: {}", tableNameWithType);
- TableConfig tableConfig = _pinotHelixResourceManager.getRealtimeTableConfig(tableNameWithType);
- final RealtimeTagConfig realtimeTagConfig = new RealtimeTagConfig(tableConfig);
- if (!realtimeTagConfig.isRelocateCompletedSegments()) {
- LOGGER.info("Skipping relocation of segments for {}", tableNameWithType);
- return;
- }
-
- Function<IdealState, IdealState> updater = new Function<IdealState, IdealState>() {
- @Nullable
- @Override
- public IdealState apply(@Nullable IdealState idealState) {
- if (!idealState.isEnabled()) {
- LOGGER.info("Skipping relocation of segments for {} since ideal state is disabled", tableNameWithType);
- return null;
- }
- relocateSegments(realtimeTagConfig, idealState);
- return idealState;
+ Function<IdealState, IdealState> updater = new Function<IdealState, IdealState>() {
+ @Nullable
+ @Override
+ public IdealState apply(@Nullable IdealState idealState) {
+ if (!idealState.isEnabled()) {
+ LOGGER.info("Skipping relocation of segments for {} since ideal state is disabled", tableNameWithType);
+ return null;
}
- };
+ relocateSegments(realtimeTagConfig, idealState);
+ return idealState;
+ }
+ };
- HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(), tableNameWithType, updater,
- RetryPolicies.exponentialBackoffRetryPolicy(5, 1000, 2.0f));
- } catch (Exception e) {
- LOGGER.error("Exception in relocating realtime segments of table {}", tableNameWithType, e);
- }
+ HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(), tableNameWithType, updater,
+ RetryPolicies.exponentialBackoffRetryPolicy(5, 1000, 2.0f));
}
/**
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 25a2db8..53f9b35 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -30,6 +30,7 @@ import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
import org.apache.pinot.common.utils.SegmentName;
@@ -53,9 +54,10 @@ public class RetentionManager extends ControllerPeriodicTask {
private final int _deletedSegmentsRetentionInDays;
- public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config) {
+ public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
+ ControllerMetrics controllerMetrics) {
super("RetentionManager", config.getRetentionControllerFrequencyInSeconds(),
- config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager);
+ config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
_deletedSegmentsRetentionInDays = config.getDeletedSegmentsRetentionInDays();
LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}, deletedSegmentsRetentionInDays: {}",
@@ -69,17 +71,12 @@ public class RetentionManager extends ControllerPeriodicTask {
@Override
protected void preprocess() {
-
}
@Override
protected void processTable(String tableNameWithType) {
- try {
- LOGGER.info("Start managing retention for table: {}", tableNameWithType);
- manageRetentionForTable(tableNameWithType);
- } catch (Exception e) {
- LOGGER.error("Caught exception while managing retention for table: {}", tableNameWithType, e);
- }
+ LOGGER.info("Start managing retention for table: {}", tableNameWithType);
+ manageRetentionForTable(tableNameWithType);
}
@Override
@@ -88,39 +85,41 @@ public class RetentionManager extends ControllerPeriodicTask {
_pinotHelixResourceManager.getSegmentDeletionManager().removeAgedDeletedSegments(_deletedSegmentsRetentionInDays);
}
+ @Override
+ protected void exceptionHandler(String tableNameWithType, Exception e) {
+ LOGGER.error("Caught exception while managing retention for table: {}", tableNameWithType, e);
+ }
+
private void manageRetentionForTable(String tableNameWithType) {
- try {
- // Build retention strategy from table config
- TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
- if (tableConfig == null) {
- LOGGER.error("Failed to get table config for table: {}", tableNameWithType);
- return;
- }
- SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
- String segmentPushType = validationConfig.getSegmentPushType();
- if (!"APPEND".equalsIgnoreCase(segmentPushType)) {
- LOGGER.info("Segment push type is not APPEND for table: {}, skip", tableNameWithType);
- return;
- }
- String retentionTimeUnit = validationConfig.getRetentionTimeUnit();
- String retentionTimeValue = validationConfig.getRetentionTimeValue();
- RetentionStrategy retentionStrategy;
- try {
- retentionStrategy = new TimeRetentionStrategy(TimeUnit.valueOf(retentionTimeUnit.toUpperCase()),
- Long.parseLong(retentionTimeValue));
- } catch (Exception e) {
- LOGGER.warn("Invalid retention time: {} {} for table: {}, skip", retentionTimeUnit, retentionTimeValue);
- return;
- }
- // Scan all segment ZK metadata and purge segments if necessary
- if (TableNameBuilder.OFFLINE.tableHasTypeSuffix(tableNameWithType)) {
- manageRetentionForOfflineTable(tableNameWithType, retentionStrategy);
- } else {
- manageRetentionForRealtimeTable(tableNameWithType, retentionStrategy);
- }
+ // Build retention strategy from table config
+ TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.error("Failed to get table config for table: {}", tableNameWithType);
+ return;
+ }
+ SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
+ String segmentPushType = validationConfig.getSegmentPushType();
+ if (!"APPEND".equalsIgnoreCase(segmentPushType)) {
+ LOGGER.info("Segment push type is not APPEND for table: {}, skip", tableNameWithType);
+ return;
+ }
+ String retentionTimeUnit = validationConfig.getRetentionTimeUnit();
+ String retentionTimeValue = validationConfig.getRetentionTimeValue();
+ RetentionStrategy retentionStrategy;
+ try {
+ retentionStrategy = new TimeRetentionStrategy(TimeUnit.valueOf(retentionTimeUnit.toUpperCase()),
+ Long.parseLong(retentionTimeValue));
} catch (Exception e) {
- LOGGER.error("Caught exception while managing retention for table: {}", tableNameWithType, e);
+ LOGGER.warn("Invalid retention time: {} {} for table: {}, skip", retentionTimeUnit, retentionTimeValue);
+ return;
+ }
+
+ // Scan all segment ZK metadata and purge segments if necessary
+ if (TableNameBuilder.OFFLINE.tableHasTypeSuffix(tableNameWithType)) {
+ manageRetentionForOfflineTable(tableNameWithType, retentionStrategy);
+ } else {
+ manageRetentionForRealtimeTable(tableNameWithType, retentionStrategy);
}
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
index c969ab1..c71a4ab 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/BrokerResourceValidationManager.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Set;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
@@ -37,9 +38,10 @@ public class BrokerResourceValidationManager extends ControllerPeriodicTask {
private List<InstanceConfig> _instanceConfigs;
- public BrokerResourceValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager) {
+ public BrokerResourceValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
+ ControllerMetrics controllerMetrics) {
super("BrokerResourceValidationManager", config.getBrokerResourceValidationFrequencyInSeconds(),
- config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager);
+ config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
}
@Override
@@ -49,26 +51,26 @@ public class BrokerResourceValidationManager extends ControllerPeriodicTask {
@Override
protected void processTable(String tableNameWithType) {
- try {
- TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
- if (tableConfig == null) {
- LOGGER.warn("Failed to find table config for table: {}, skipping broker resource validation", tableNameWithType);
- return;
- }
-
- // Rebuild broker resource
- Set<String> brokerInstances = _pinotHelixResourceManager.getAllInstancesForBrokerTenant(_instanceConfigs,
- tableConfig.getTenantConfig().getBroker());
- _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType, brokerInstances);
- } catch (Exception e) {
- LOGGER.warn("Caught exception while validating broker resource for table: {}", tableNameWithType, e);
+ TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}, skipping broker resource validation", tableNameWithType);
+ return;
}
+
+ // Rebuild broker resource
+ Set<String> brokerInstances = _pinotHelixResourceManager.getAllInstancesForBrokerTenant(_instanceConfigs,
+ tableConfig.getTenantConfig().getBroker());
+ _pinotHelixResourceManager.rebuildBrokerResource(tableNameWithType, brokerInstances);
}
@Override
protected void postprocess() {
+ }
+ @Override
+ protected void exceptionHandler(String tableNameWithType, Exception e) {
+ LOGGER.error("Caught exception while validating broker resource for table: {}", tableNameWithType, e);
}
@Override
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index 14b9827..ca9563b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -26,6 +26,7 @@ import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
+import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.metrics.ValidationMetrics;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.time.TimeUtils;
@@ -48,9 +49,9 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask {
private final ValidationMetrics _validationMetrics;
public OfflineSegmentIntervalChecker(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
- ValidationMetrics validationMetrics) {
+ ValidationMetrics validationMetrics, ControllerMetrics controllerMetrics) {
super("OfflineSegmentIntervalChecker", config.getOfflineSegmentIntervalCheckerFrequencyInSeconds(),
- config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager);
+ config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
_validationMetrics = validationMetrics;
}
@@ -60,21 +61,15 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask {
@Override
protected void processTable(String tableNameWithType) {
- try {
+ CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType == CommonConstants.Helix.TableType.OFFLINE) {
- CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
- if (tableType == CommonConstants.Helix.TableType.OFFLINE) {
-
- TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
- if (tableConfig == null) {
- LOGGER.warn("Failed to find table config for table: {}, skipping validation", tableNameWithType);
- return;
- }
-
- validateOfflineSegmentPush(tableConfig);
+ TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}, skipping validation", tableNameWithType);
+ return;
}
- } catch (Exception e) {
- LOGGER.warn("Caught exception while checking offline segment intervals for table: {}", tableNameWithType, e);
+ validateOfflineSegmentPush(tableConfig);
}
}
@@ -212,7 +207,11 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask {
@Override
protected void postprocess() {
+ }
+ @Override
+ protected void exceptionHandler(String tableNameWithType, Exception e) {
+ LOGGER.warn("Caught exception while checking offline segment intervals for table: {}", tableNameWithType, e);
}
@Override
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 245437f..a43d63b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.metrics.ValidationMetrics;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.HLCSegmentName;
@@ -53,9 +54,10 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask {
private boolean _updateRealtimeDocumentCount;
public RealtimeSegmentValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
- PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, ValidationMetrics validationMetrics) {
+ PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, ValidationMetrics validationMetrics,
+ ControllerMetrics controllerMetrics) {
super("RealtimeSegmentValidationManager", config.getRealtimeSegmentValidationFrequencyInSeconds(),
- config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager);
+ config.getPeriodicTaskInitialDelayInSeconds(), pinotHelixResourceManager, controllerMetrics);
_llcRealtimeSegmentManager = llcRealtimeSegmentManager;
_validationMetrics = validationMetrics;
@@ -78,28 +80,24 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask {
@Override
protected void processTable(String tableNameWithType) {
- try {
- CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
- if (tableType == CommonConstants.Helix.TableType.REALTIME) {
-
- TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
- if (tableConfig == null) {
- LOGGER.warn("Failed to find table config for table: {}, skipping validation", tableNameWithType);
- return;
- }
+ CommonConstants.Helix.TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ if (tableType == CommonConstants.Helix.TableType.REALTIME) {
- if (_updateRealtimeDocumentCount) {
- updateRealtimeDocumentCount(tableConfig);
- }
+ TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}, skipping validation", tableNameWithType);
+ return;
+ }
- Map<String, String> streamConfigMap = tableConfig.getIndexingConfig().getStreamConfigs();
- StreamConfig streamConfig = new StreamConfig(streamConfigMap);
- if (streamConfig.hasLowLevelConsumerType()) {
- _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig);
- }
+ if (_updateRealtimeDocumentCount) {
+ updateRealtimeDocumentCount(tableConfig);
+ }
+
+ Map<String, String> streamConfigMap = tableConfig.getIndexingConfig().getStreamConfigs();
+ StreamConfig streamConfig = new StreamConfig(streamConfigMap);
+ if (streamConfig.hasLowLevelConsumerType()) {
+ _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig);
}
- } catch (Exception e) {
- LOGGER.warn("Caught exception while validating realtime table: {}", tableNameWithType, e);
}
}
@@ -151,7 +149,11 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask {
@Override
protected void postprocess() {
+ }
+ @Override
+ protected void exceptionHandler(String tableNameWithType, Exception e) {
+ LOGGER.error("Caught exception while validating realtime table: {}", tableNameWithType, e);
}
@Override
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index 29a96de..e20cdc8 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -18,11 +18,14 @@
*/
package org.apache.pinot.controller.helix.core.periodictask;
+import com.yammer.metrics.core.MetricsRegistry;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.testng.annotations.BeforeTest;
@@ -40,14 +43,16 @@ public class ControllerPeriodicTaskTest {
private final ControllerConf _controllerConf = new ControllerConf();
private final PinotHelixResourceManager _resourceManager = mock(PinotHelixResourceManager.class);
+ private final ControllerMetrics _controllerMetrics = new ControllerMetrics(new MetricsRegistry());
private final AtomicBoolean _stopTaskCalled = new AtomicBoolean();
private final AtomicBoolean _initTaskCalled = new AtomicBoolean();
private final AtomicBoolean _processCalled = new AtomicBoolean();
- private final AtomicInteger _numTablesProcessed = new AtomicInteger();
+ private final AtomicInteger _tablesProcessed = new AtomicInteger();
private final int _numTables = 3;
+ private static final String TASK_NAME = "TestTask";
- private final MockControllerPeriodicTask _task = new MockControllerPeriodicTask("TestTask", RUN_FREQUENCY_IN_SECONDS,
- _controllerConf.getPeriodicTaskInitialDelayInSeconds(), _resourceManager) {
+ private final MockControllerPeriodicTask _task = new MockControllerPeriodicTask(TASK_NAME, RUN_FREQUENCY_IN_SECONDS,
+ _controllerConf.getPeriodicTaskInitialDelayInSeconds(), _resourceManager, _controllerMetrics) {
@Override
protected void initTask() {
@@ -67,7 +72,7 @@ public class ControllerPeriodicTaskTest {
@Override
public void processTable(String tableNameWithType) {
- _numTablesProcessed.getAndIncrement();
+ _tablesProcessed.getAndIncrement();
}
};
@@ -82,7 +87,8 @@ public class ControllerPeriodicTaskTest {
_initTaskCalled.set(false);
_stopTaskCalled.set(false);
_processCalled.set(false);
- _numTablesProcessed.set(0);
+ _tablesProcessed.set(0);
+ _controllerMetrics.setValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, TASK_NAME,0);
}
@Test
@@ -102,16 +108,20 @@ public class ControllerPeriodicTaskTest {
_task.init();
assertTrue(_initTaskCalled.get());
assertFalse(_processCalled.get());
- assertEquals(_numTablesProcessed.get(), 0);
+ assertEquals(_tablesProcessed.get(), 0);
assertFalse(_stopTaskCalled.get());
assertFalse(_task.shouldStopPeriodicTask());
+ assertEquals(_controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, TASK_NAME),
+ 0);
// run task - leadership gained
resetState();
_task.run();
assertFalse(_initTaskCalled.get());
assertTrue(_processCalled.get());
- assertEquals(_numTablesProcessed.get(), _numTables);
+ assertEquals(_tablesProcessed.get(), _numTables);
+ assertEquals(_controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, TASK_NAME),
+ _numTables);
assertFalse(_stopTaskCalled.get());
assertFalse(_task.shouldStopPeriodicTask());
@@ -120,7 +130,9 @@ public class ControllerPeriodicTaskTest {
_task.stop();
assertFalse(_initTaskCalled.get());
assertFalse(_processCalled.get());
- assertEquals(_numTablesProcessed.get(), 0);
+ assertEquals(_tablesProcessed.get(), 0);
+ assertEquals(_controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, TASK_NAME),
+ 0);
assertTrue(_stopTaskCalled.get());
assertTrue(_task.shouldStopPeriodicTask());
@@ -130,7 +142,9 @@ public class ControllerPeriodicTaskTest {
assertFalse(_task.shouldStopPeriodicTask());
assertFalse(_initTaskCalled.get());
assertTrue(_processCalled.get());
- assertEquals(_numTablesProcessed.get(), _numTables);
+ assertEquals(_tablesProcessed.get(), _numTables);
+ assertEquals(_controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED, TASK_NAME),
+ _numTables);
assertFalse(_stopTaskCalled.get());
}
@@ -138,8 +152,8 @@ public class ControllerPeriodicTaskTest {
private class MockControllerPeriodicTask extends ControllerPeriodicTask {
public MockControllerPeriodicTask(String taskName, long runFrequencyInSeconds, long initialDelayInSeconds,
- PinotHelixResourceManager pinotHelixResourceManager) {
- super(taskName, runFrequencyInSeconds, initialDelayInSeconds, pinotHelixResourceManager);
+ PinotHelixResourceManager pinotHelixResourceManager, ControllerMetrics controllerMetrics) {
+ super(taskName, runFrequencyInSeconds, initialDelayInSeconds, pinotHelixResourceManager, controllerMetrics);
}
@Override
@@ -149,7 +163,6 @@ public class ControllerPeriodicTaskTest {
@Override
protected void preprocess() {
-
}
@Override
@@ -159,9 +172,12 @@ public class ControllerPeriodicTaskTest {
@Override
public void postprocess() {
-
}
+ @Override
+ public void exceptionHandler(String tableNameWithType, Exception e) {
+
+ }
@Override
public void stopTask() {
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
index 7271ca8..4dedc43 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
@@ -19,6 +19,7 @@
package org.apache.pinot.controller.helix.core.relocation;
import com.google.common.collect.Lists;
+import com.yammer.metrics.core.MetricsRegistry;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -31,6 +32,7 @@ import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.builder.CustomModeISBuilder;
import org.apache.pinot.common.config.RealtimeTagConfig;
+import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
@@ -68,7 +70,9 @@ public class RealtimeSegmentRelocatorTest {
_mockHelixManager = mock(HelixManager.class);
when(mockPinotHelixResourceManager.getHelixZkManager()).thenReturn(_mockHelixManager);
ControllerConf controllerConfig = new ControllerConf();
- _realtimeSegmentRelocator = new TestRealtimeSegmentRelocator(mockPinotHelixResourceManager, controllerConfig);
+ ControllerMetrics controllerMetrics = new ControllerMetrics(new MetricsRegistry());
+ _realtimeSegmentRelocator =
+ new TestRealtimeSegmentRelocator(mockPinotHelixResourceManager, controllerConfig, controllerMetrics);
final int maxInstances = 20;
serverNames = new String[maxInstances];
@@ -261,8 +265,9 @@ public class RealtimeSegmentRelocatorTest {
private Map<String, List<String>> tagToInstances;
- public TestRealtimeSegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config) {
- super(pinotHelixResourceManager, config);
+ public TestRealtimeSegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager, ControllerConf config,
+ ControllerMetrics controllerMetrics) {
+ super(pinotHelixResourceManager, config, controllerMetrics);
tagToInstances = new HashedMap();
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index a171505..e569439 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.controller.helix.core.retention;
+import com.yammer.metrics.core.MetricsRegistry;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -29,6 +30,7 @@ import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.segment.SegmentMetadata;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.LLCSegmentName;
@@ -86,9 +88,10 @@ public class RetentionManagerTest {
when(pinotHelixResourceManager.getOfflineSegmentMetadata(OFFLINE_TABLE_NAME)).thenReturn(metadataList);
ControllerConf conf = new ControllerConf();
+ ControllerMetrics controllerMetrics = new ControllerMetrics(new MetricsRegistry());
conf.setRetentionControllerFrequencyInSeconds(0);
conf.setDeletedSegmentsRetentionInDays(0);
- RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, conf);
+ RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, conf, controllerMetrics);
retentionManager.init();
retentionManager.run();
@@ -206,9 +209,10 @@ public class RetentionManagerTest {
setupPinotHelixResourceManager(tableConfig, removedSegments, pinotHelixResourceManager);
ControllerConf conf = new ControllerConf();
+ ControllerMetrics controllerMetrics = new ControllerMetrics(new MetricsRegistry());
conf.setRetentionControllerFrequencyInSeconds(0);
conf.setDeletedSegmentsRetentionInDays(0);
- RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, conf);
+ RetentionManager retentionManager = new RetentionManager(pinotHelixResourceManager, conf, controllerMetrics);
retentionManager.init();
retentionManager.run();
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/controller/periodic/tasks/SegmentStatusCheckerIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/controller/periodic/tasks/SegmentStatusCheckerIntegrationTest.java
index deefc4a..9966840 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/controller/periodic/tasks/SegmentStatusCheckerIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/controller/periodic/tasks/SegmentStatusCheckerIntegrationTest.java
@@ -40,6 +40,8 @@ import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet;
import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
import org.apache.pinot.util.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -52,13 +54,18 @@ import org.testng.annotations.Test;
*/
public class SegmentStatusCheckerIntegrationTest extends BaseClusterIntegrationTestSet {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SegmentStatusCheckerIntegrationTest.class);
+
private String emptyTable = "table1_OFFLINE";
private String disabledOfflineTable = "table2_OFFLINE";
private String basicOfflineTable = "table3_OFFLINE";
private String errorOfflineTable = "table4_OFFLINE";
private String realtimeTableErrorState = "table5_REALTIME";
+ private String _currentTableName;
+ private static final int NUM_TABLES = 5;
private static final int SEGMENT_STATUS_CHECKER_INITIAL_DELAY_SECONDS = 60;
+ private static final int SEGMENT_STATUS_CHECKER_FREQ_SECONDS = 5;
@BeforeClass
public void setUp() throws Exception {
@@ -67,14 +74,13 @@ public class SegmentStatusCheckerIntegrationTest extends BaseClusterIntegrationT
startZk();
// Set initial delay of 60 seconds for the segment status checker, to allow time for tables setup.
- // By default, it will pick a random delay between 120s and 300s
+ // Run at 5 seconds freq in order to keep it running, in case first run happens before table setup
ControllerConf controllerConf = getDefaultControllerConfiguration();
controllerConf.setStatusCheckerInitialDelayInSeconds(SEGMENT_STATUS_CHECKER_INITIAL_DELAY_SECONDS);
+ controllerConf.setStatusCheckerFrequencyInSeconds(SEGMENT_STATUS_CHECKER_FREQ_SECONDS);
startController(controllerConf);
-
startBroker();
-
startServers(3);
// empty table
@@ -107,9 +113,6 @@ public class SegmentStatusCheckerIntegrationTest extends BaseClusterIntegrationT
// realtime table with segments in error state
setupRealtimeTable(realtimeTableErrorState);
-
- // we need to wait for SegmentStatusChecker to finish at least 1 run
- Thread.sleep(TimeUnit.MILLISECONDS.convert(SEGMENT_STATUS_CHECKER_INITIAL_DELAY_SECONDS + 10, TimeUnit.SECONDS));
}
private void setupOfflineTable(String table) throws Exception {
@@ -119,6 +122,8 @@ public class SegmentStatusCheckerIntegrationTest extends BaseClusterIntegrationT
}
private void setupOfflineTableAndSegments(String table) throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+ setTableName(table);
_realtimeTableConfig = null;
addOfflineTable(table);
completeTableConfiguration();
@@ -129,6 +134,8 @@ public class SegmentStatusCheckerIntegrationTest extends BaseClusterIntegrationT
executor.shutdown();
executor.awaitTermination(10, TimeUnit.MINUTES);
uploadSegments(_tarDir);
+
+ waitForAllDocsLoaded(600_000L);
}
private void setupRealtimeTable(String table) throws Exception {
@@ -151,6 +158,14 @@ public class SegmentStatusCheckerIntegrationTest extends BaseClusterIntegrationT
completeTableConfiguration();
}
+ @Override
+ public String getTableName() {
+ return _currentTableName;
+ }
+
+ private void setTableName(String tableName) {
+ _currentTableName = tableName;
+ }
/**
* After 1 run of SegmentStatusChecker the controllerMetrics will be set for each table
* Validate that we are seeing the expected numbers
@@ -159,6 +174,20 @@ public class SegmentStatusCheckerIntegrationTest extends BaseClusterIntegrationT
public void testSegmentStatusChecker() {
ControllerMetrics controllerMetrics = _controllerStarter.getControllerMetrics();
+ long millisToWait = TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
+ while (controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
+ "SegmentStatusChecker") < NUM_TABLES && millisToWait > 0) {
+ try {
+ Thread.sleep(1000);
+ millisToWait -= 1000;
+ } catch (InterruptedException e) {
+ LOGGER.info("Interrupted while waiting for SegmentStatusChecker");
+ }
+ }
+
+ Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
+ "SegmentStatusChecker"), NUM_TABLES);
+
// empty table - table1_OFFLINE
// num replicas set from ideal state
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(emptyTable, ControllerGauge.NUMBER_OF_REPLICAS), 3);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org