You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2021/10/23 03:07:32 UTC
[pinot] branch master updated: Use maxEndTimeMs for merge/roll-up
delay metrics. (#7617)
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7bcbda1 Use maxEndTimeMs for merge/roll-up delay metrics. (#7617)
7bcbda1 is described below
commit 7bcbda1680922b97aa1ccaecd3b5eeaf75a87a02
Author: Jiapeng Tao <ji...@linkedin.com>
AuthorDate: Fri Oct 22 20:07:09 2021 -0700
Use maxEndTimeMs for merge/roll-up delay metrics. (#7617)
---
.../mergerollup/MergeRollupTaskGenerator.java | 40 ++++++++++++++--------
1 file changed, 26 insertions(+), 14 deletions(-)
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index e81ee4e..3b59a6f 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -110,21 +110,19 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator {
// number to be 7 and merge task is configured with "bucketTimePeriod = 1d", this means that we have 7 days of
// delay. When operating merge/roll-up task in production, we should set the alert on this metrics to find out the
// delay. Setting the alert on 7 time buckets of delay would be a good starting point.
- //
- // NOTE: Based on the current scheduler logic, we are bumping up the watermark with some delay. (the current round
- // will bump up the watermark for the window that got processed from the previous round). Due to this, we will
- // correctly report the delay with one edge case. When we processed all available time windows, the watermark
- // will not get bumped up until we schedule some task for the table. Due to this, we will always see the delay >= 1.
private static final String MERGE_ROLLUP_TASK_DELAY_IN_NUM_BUCKETS = "mergeRollupTaskDelayInNumBuckets";
// tableNameWithType -> mergeLevel -> watermarkMs
private Map<String, Map<String, Long>> _mergeRollupWatermarks;
+ // tableNameWithType -> maxEndTime
+ private Map<String, Long> _tableMaxEndTimeMs;
private ClusterInfoAccessor _clusterInfoAccessor;
@Override
public void init(ClusterInfoAccessor clusterInfoAccessor) {
_clusterInfoAccessor = clusterInfoAccessor;
_mergeRollupWatermarks = new HashMap<>();
+ _tableMaxEndTimeMs = new HashMap<>();
}
@Override
@@ -254,7 +252,12 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator {
long bucketEndMs = bucketStartMs + bucketMs;
// Create delay metrics even if there's no task scheduled, this helps the case that the controller is restarted
// but the metrics are not available until the controller schedules a valid task
- createOrUpdateDelayMetrics(offlineTableName, mergeLevel, watermarkMs, bufferMs, bucketMs);
+ long maxEndTimeMs = Long.MIN_VALUE;
+ for (SegmentZKMetadata preSelectedSegment : preSelectedSegments) {
+ maxEndTimeMs = Math.max(maxEndTimeMs, preSelectedSegment.getEndTimeMs());
+ }
+ createOrUpdateDelayMetrics(offlineTableName, mergeLevel, null, watermarkMs, maxEndTimeMs,
+ bufferMs, bucketMs);
if (!isValidBucketEndTime(bucketEndMs, bufferMs, lowerMergeLevel, mergeRollupTaskMetadata)) {
LOGGER.info("Bucket with start: {} and end: {} (table : {}, mergeLevel : {}) cannot be merged yet",
bucketStartMs, bucketEndMs, offlineTableName, mergeLevel);
@@ -339,7 +342,8 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator {
watermarkMs, newWatermarkMs);
// Update the delay metrics
- createOrUpdateDelayMetrics(offlineTableName, mergeLevel, newWatermarkMs, bufferMs, bucketMs);
+ createOrUpdateDelayMetrics(offlineTableName, mergeLevel, lowerMergeLevel, newWatermarkMs, maxEndTimeMs,
+ bufferMs, bucketMs);
// Create task configs
int maxNumRecordsPerTask =
@@ -542,11 +546,13 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator {
return pinotTaskConfigs;
}
- private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long bufferTimeMs, long bucketTimeMs) {
+ private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long maxEndTimeMsOfCurrentLevel,
+ long bufferTimeMs, long bucketTimeMs) {
if (watermarkMs == -1) {
return 0;
}
- return (System.currentTimeMillis() - watermarkMs - bufferTimeMs) / bucketTimeMs;
+ return (Math.min(System.currentTimeMillis() - bufferTimeMs, maxEndTimeMsOfCurrentLevel) - watermarkMs)
+ / bucketTimeMs;
}
/**
@@ -555,12 +561,14 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator {
*
* @param tableNameWithType table name with type
* @param mergeLevel merge level
+ * @param lowerMergeLevel lower merge level
* @param watermarkMs current watermark value
+ * @param maxEndTimeMs max end time of all the segments for the table
* @param bufferTimeMs buffer time
* @param bucketTimeMs bucket time
*/
- private void createOrUpdateDelayMetrics(String tableNameWithType, String mergeLevel, long watermarkMs,
- long bufferTimeMs, long bucketTimeMs) {
+ private void createOrUpdateDelayMetrics(String tableNameWithType, String mergeLevel, String lowerMergeLevel,
+ long watermarkMs, long maxEndTimeMs, long bufferTimeMs, long bucketTimeMs) {
ControllerMetrics controllerMetrics = _clusterInfoAccessor.getControllerMetrics();
if (controllerMetrics == null) {
return;
@@ -569,16 +577,20 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator {
// Update gauge value that indicates the delay in terms of the number of time buckets.
Map<String, Long> watermarkForTable =
_mergeRollupWatermarks.computeIfAbsent(tableNameWithType, k -> new ConcurrentHashMap<>());
+ _tableMaxEndTimeMs.put(tableNameWithType, maxEndTimeMs);
watermarkForTable.compute(mergeLevel, (k, v) -> {
if (v == null) {
LOGGER.info(
"Creating the gauge metric for tracking the merge/roll-up task delay for table: {} and mergeLevel: {}."
+ "(watermarkMs={}, bufferTimeMs={}, bucketTimeMs={}, taskDelayInNumTimeBuckets={})", tableNameWithType,
mergeLevel, watermarkMs, bucketTimeMs, bucketTimeMs,
- getMergeRollupTaskDelayInNumTimeBuckets(watermarkMs, bufferTimeMs, bucketTimeMs));
+ getMergeRollupTaskDelayInNumTimeBuckets(watermarkMs, lowerMergeLevel == null
+ ? _tableMaxEndTimeMs.get(tableNameWithType) : watermarkForTable.get(lowerMergeLevel),
+ bufferTimeMs, bucketTimeMs));
controllerMetrics.addCallbackGaugeIfNeeded(getMetricNameForTaskDelay(tableNameWithType, mergeLevel),
- (() -> getMergeRollupTaskDelayInNumTimeBuckets(watermarkForTable.getOrDefault(k, -1L), bufferTimeMs,
- bucketTimeMs)));
+ (() -> getMergeRollupTaskDelayInNumTimeBuckets(watermarkForTable.getOrDefault(k, -1L),
+ lowerMergeLevel == null ? _tableMaxEndTimeMs.get(tableNameWithType)
+ : watermarkForTable.get(lowerMergeLevel), bufferTimeMs, bucketTimeMs)));
}
return watermarkMs;
});
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org