You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2021/12/02 02:27:23 UTC
[pinot] branch master updated: Use valid bucket end time instead of segment end time for merge/rollup delay metrics. This handles the corner case that the metric take the segments which are not ready to merge into consideration. (#7827)
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 47e49ec Use valid bucket end time instead of segment end time for merge/rollup delay metrics. This handles the corner case that the metric take the segments which are not ready to merge into consideration. (#7827)
47e49ec is described below
commit 47e49ecd6e11aebe74f8868cdac22051b175d4c5
Author: Jiapeng Tao <ji...@linkedin.com>
AuthorDate: Wed Dec 1 18:27:06 2021 -0800
Use valid bucket end time instead of segment end time for merge/rollup delay metrics. This handles the corner case that the metric take the segments which are not ready to merge into consideration. (#7827)
---
.../mergerollup/MergeRollupTaskGenerator.java | 55 +++++++++++++++-------
1 file changed, 37 insertions(+), 18 deletions(-)
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index ece07be..1f81011 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -114,15 +114,15 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator {
// tableNameWithType -> mergeLevel -> watermarkMs
private Map<String, Map<String, Long>> _mergeRollupWatermarks;
- // tableNameWithType -> maxEndTime
- private Map<String, Long> _tableMaxEndTimeMs;
+ // tableNameWithType -> maxValidBucketEndTime
+ private Map<String, Long> _tableMaxValidBucketEndTimeMs;
private ClusterInfoAccessor _clusterInfoAccessor;
@Override
public void init(ClusterInfoAccessor clusterInfoAccessor) {
_clusterInfoAccessor = clusterInfoAccessor;
_mergeRollupWatermarks = new HashMap<>();
- _tableMaxEndTimeMs = new HashMap<>();
+ _tableMaxValidBucketEndTimeMs = new HashMap<>();
}
@Override
@@ -252,15 +252,13 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator {
long bucketEndMs = bucketStartMs + bucketMs;
// Create delay metrics even if there's no task scheduled, this helps the case that the controller is restarted
// but the metrics are not available until the controller schedules a valid task
- long maxEndTimeMs = Long.MIN_VALUE;
+ long maxValidBucketEndTimeMs = Long.MIN_VALUE;
for (SegmentZKMetadata preSelectedSegment : preSelectedSegments) {
- long currentEndTimeMs = preSelectedSegment.getEndTimeMs();
- // Compute maxEndTimeMs among segments that are valid for merge
- if (currentEndTimeMs < System.currentTimeMillis() - bufferMs) {
- maxEndTimeMs = Math.max(maxEndTimeMs, currentEndTimeMs);
- }
+ // Compute maxValidBucketEndTimeMs among segments that are ready for merge
+ long currentValidBucketEndTimeMs = getValidBucketEndTimeMsForSegment(preSelectedSegment, bucketMs, bufferMs);
+ maxValidBucketEndTimeMs = Math.max(maxValidBucketEndTimeMs, currentValidBucketEndTimeMs);
}
- createOrUpdateDelayMetrics(offlineTableName, mergeLevel, null, watermarkMs, maxEndTimeMs,
+ createOrUpdateDelayMetrics(offlineTableName, mergeLevel, null, watermarkMs, maxValidBucketEndTimeMs,
bufferMs, bucketMs);
if (!isValidBucketEndTime(bucketEndMs, bufferMs, lowerMergeLevel, mergeRollupTaskMetadata)) {
LOGGER.info("Bucket with start: {} and end: {} (table : {}, mergeLevel : {}) cannot be merged yet",
@@ -346,8 +344,8 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator {
watermarkMs, newWatermarkMs);
// Update the delay metrics
- createOrUpdateDelayMetrics(offlineTableName, mergeLevel, lowerMergeLevel, newWatermarkMs, maxEndTimeMs,
- bufferMs, bucketMs);
+ createOrUpdateDelayMetrics(offlineTableName, mergeLevel, lowerMergeLevel, newWatermarkMs,
+ maxValidBucketEndTimeMs, bufferMs, bucketMs);
// Create task configs
int maxNumRecordsPerTask =
@@ -440,6 +438,28 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator {
}
/**
+ * Get the valid bucket end time before the buffer (now - bufferMs). Consider the segment as multiple contiguous
+ * time buckets, this function will return the last bucket end time before the buffer. Return LONG.MIN_VALUE if
+ * there's no valid bucket before the buffer.
+ */
+ private long getValidBucketEndTimeMsForSegment(SegmentZKMetadata segmentZKMetadata, long bucketMs, long bufferMs) {
+ // Make sure the segment is ready for merge (the first bucket <= now - bufferTime)
+ long currentTimeMs = System.currentTimeMillis();
+ long firstBucketEndTimeMs = segmentZKMetadata.getStartTimeMs() / bucketMs * bucketMs + bucketMs;
+ if (firstBucketEndTimeMs > currentTimeMs - bufferMs) {
+ return Long.MIN_VALUE;
+ }
+ // The validBucketEndTime is calculated as the min(segment end time, now - bufferTime) rounded to the bucket
+ // boundary.
+ // Notice bucketEndTime is exclusive while segment end time is inclusive. E.g. if bucketTime = 1d,
+ // the rounded segment end time of [10/1 00:00, 10/1 23:59] is 10/2 00:00. The rounded segment end time of
+ // [10/1 00:00, 10/2 00:00] is 10/3 00:00
+ long validBucketEndTimeMs = (segmentZKMetadata.getEndTimeMs() / bucketMs + 1) * bucketMs;
+ validBucketEndTimeMs = Math.min(validBucketEndTimeMs, (currentTimeMs - bucketMs) / bucketMs * bucketMs);
+ return validBucketEndTimeMs;
+ }
+
+ /**
* Check if the segment span multiple buckets
*/
private boolean hasSpilledOverData(SegmentZKMetadata segmentZKMetadata, long bucketMs) {
@@ -562,17 +582,16 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator {
/**
* Update the delay metrics for the given table and merge level. We create the new gauge metric if the metric is not
* available.
- *
* @param tableNameWithType table name with type
* @param mergeLevel merge level
* @param lowerMergeLevel lower merge level
* @param watermarkMs current watermark value
- * @param maxEndTimeMs max end time of all the segments for the table
+ * @param maxValidBucketEndTimeMs max valid bucket end time of all the segments for the table
* @param bufferTimeMs buffer time
* @param bucketTimeMs bucket time
*/
private void createOrUpdateDelayMetrics(String tableNameWithType, String mergeLevel, String lowerMergeLevel,
- long watermarkMs, long maxEndTimeMs, long bufferTimeMs, long bucketTimeMs) {
+ long watermarkMs, long maxValidBucketEndTimeMs, long bufferTimeMs, long bucketTimeMs) {
ControllerMetrics controllerMetrics = _clusterInfoAccessor.getControllerMetrics();
if (controllerMetrics == null) {
return;
@@ -581,7 +600,7 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator {
// Update gauge value that indicates the delay in terms of the number of time buckets.
Map<String, Long> watermarkForTable =
_mergeRollupWatermarks.computeIfAbsent(tableNameWithType, k -> new ConcurrentHashMap<>());
- _tableMaxEndTimeMs.put(tableNameWithType, maxEndTimeMs);
+ _tableMaxValidBucketEndTimeMs.put(tableNameWithType, maxValidBucketEndTimeMs);
watermarkForTable.compute(mergeLevel, (k, v) -> {
if (v == null) {
LOGGER.info(
@@ -589,11 +608,11 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator {
+ "(watermarkMs={}, bufferTimeMs={}, bucketTimeMs={}, taskDelayInNumTimeBuckets={})", tableNameWithType,
mergeLevel, watermarkMs, bucketTimeMs, bucketTimeMs,
getMergeRollupTaskDelayInNumTimeBuckets(watermarkMs, lowerMergeLevel == null
- ? _tableMaxEndTimeMs.get(tableNameWithType) : watermarkForTable.get(lowerMergeLevel),
+ ? _tableMaxValidBucketEndTimeMs.get(tableNameWithType) : watermarkForTable.get(lowerMergeLevel),
bufferTimeMs, bucketTimeMs));
controllerMetrics.addCallbackGaugeIfNeeded(getMetricNameForTaskDelay(tableNameWithType, mergeLevel),
(() -> getMergeRollupTaskDelayInNumTimeBuckets(watermarkForTable.getOrDefault(k, -1L),
- lowerMergeLevel == null ? _tableMaxEndTimeMs.get(tableNameWithType)
+ lowerMergeLevel == null ? _tableMaxValidBucketEndTimeMs.get(tableNameWithType)
: watermarkForTable.get(lowerMergeLevel), bufferTimeMs, bucketTimeMs)));
}
return watermarkMs;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org