You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/11/23 16:23:17 UTC
[pinot] branch master updated: Proceed to next time window when no segment match for RealtimeToOfflineTask (#7814)
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 767aa8a Proceed to next time window when no segment match for RealtimeToOfflineTask (#7814)
767aa8a is described below
commit 767aa8abfb5bf085ba0a7ae5ff4024679f27816e
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Nov 23 08:22:53 2021 -0800
Proceed to next time window when no segment match for RealtimeToOfflineTask (#7814)
---
.../RealtimeToOfflineSegmentsTaskGenerator.java | 97 ++++++++++++----------
...RealtimeToOfflineSegmentsTaskGeneratorTest.java | 32 +++++++
2 files changed, 83 insertions(+), 46 deletions(-)
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
index b715e55..7b0b490 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
@@ -119,9 +119,8 @@ public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerato
Map<String, TaskState> incompleteTasks =
TaskGeneratorUtils.getIncompleteTasks(taskType, realtimeTableName, _clusterInfoAccessor);
if (!incompleteTasks.isEmpty()) {
- LOGGER
- .warn("Found incomplete tasks: {} for same table: {}. Skipping task generation.", incompleteTasks.keySet(),
- realtimeTableName);
+ LOGGER.warn("Found incomplete tasks: {} for same table: {}. Skipping task generation.",
+ incompleteTasks.keySet(), realtimeTableName);
continue;
}
@@ -132,16 +131,15 @@ public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerato
getCompletedSegmentsInfo(realtimeTableName, completedSegmentsZKMetadata, partitionToLatestCompletedSegmentName,
allPartitions);
if (completedSegmentsZKMetadata.isEmpty()) {
- LOGGER
- .info("No realtime-completed segments found for table: {}, skipping task generation: {}", realtimeTableName,
- taskType);
+ LOGGER.info("No realtime-completed segments found for table: {}, skipping task generation: {}",
+ realtimeTableName, taskType);
continue;
}
allPartitions.removeAll(partitionToLatestCompletedSegmentName.keySet());
if (!allPartitions.isEmpty()) {
- LOGGER
- .info("Partitions: {} have no completed segments. Table: {} is not ready for {}. Skipping task generation.",
- allPartitions, realtimeTableName, taskType);
+ LOGGER.info(
+ "Partitions: {} have no completed segments. Table: {} is not ready for {}. Skipping task generation.",
+ allPartitions, realtimeTableName, taskType);
continue;
}
@@ -163,47 +161,53 @@ public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerato
long windowStartMs = getWatermarkMs(realtimeTableName, completedSegmentsZKMetadata, bucketMs);
long windowEndMs = windowStartMs + bucketMs;
- // Check that execution window is older than bufferTime
- if (windowEndMs > System.currentTimeMillis() - bufferMs) {
- LOGGER.info(
- "Window with start: {} and end: {} is not older than buffer time: {} configured as {} ago. Skipping task "
- + "generation: {}",
- windowStartMs, windowEndMs, bufferMs, bufferTimePeriod, taskType);
- continue;
- }
-
// Find all COMPLETED segments with data overlapping execution window: windowStart (inclusive) to windowEnd
// (exclusive)
List<String> segmentNames = new ArrayList<>();
List<String> downloadURLs = new ArrayList<>();
Set<String> lastCompletedSegmentPerPartition = new HashSet<>(partitionToLatestCompletedSegmentName.values());
boolean skipGenerate = false;
- for (SegmentZKMetadata segmentZKMetadata : completedSegmentsZKMetadata) {
- String segmentName = segmentZKMetadata.getSegmentName();
- long segmentStartTimeMs = segmentZKMetadata.getStartTimeMs();
- long segmentEndTimeMs = segmentZKMetadata.getEndTimeMs();
-
- // Check overlap with window
- if (windowStartMs <= segmentEndTimeMs && segmentStartTimeMs < windowEndMs) {
- // If last completed segment is being used, make sure that segment crosses over end of window.
- // In the absence of this check, CONSUMING segments could contain some portion of the window. That data
- // would be skipped forever.
- if (lastCompletedSegmentPerPartition.contains(segmentName) && segmentEndTimeMs < windowEndMs) {
- LOGGER.info(
- "Window data overflows into CONSUMING segments for partition of segment: {}. Skipping task "
- + "generation: {}",
- segmentName, taskType);
- skipGenerate = true;
- break;
+ while (true) {
+ // Check that execution window is older than bufferTime
+ if (windowEndMs > System.currentTimeMillis() - bufferMs) {
+ LOGGER.info(
+ "Window with start: {} and end: {} is not older than buffer time: {} configured as {} ago. Skipping task "
+ + "generation: {}", windowStartMs, windowEndMs, bufferMs, bufferTimePeriod, taskType);
+ skipGenerate = true;
+ break;
+ }
+
+ for (SegmentZKMetadata segmentZKMetadata : completedSegmentsZKMetadata) {
+ String segmentName = segmentZKMetadata.getSegmentName();
+ long segmentStartTimeMs = segmentZKMetadata.getStartTimeMs();
+ long segmentEndTimeMs = segmentZKMetadata.getEndTimeMs();
+
+ // Check overlap with window
+ if (windowStartMs <= segmentEndTimeMs && segmentStartTimeMs < windowEndMs) {
+ // If last completed segment is being used, make sure that segment crosses over end of window.
+ // In the absence of this check, CONSUMING segments could contain some portion of the window. That data
+ // would be skipped forever.
+ if (lastCompletedSegmentPerPartition.contains(segmentName) && segmentEndTimeMs < windowEndMs) {
+ LOGGER.info("Window data overflows into CONSUMING segments for partition of segment: {}. Skipping task "
+ + "generation: {}", segmentName, taskType);
+ skipGenerate = true;
+ break;
+ }
+ segmentNames.add(segmentName);
+ downloadURLs.add(segmentZKMetadata.getDownloadUrl());
}
- segmentNames.add(segmentName);
- downloadURLs.add(segmentZKMetadata.getDownloadUrl());
}
+ if (skipGenerate || !segmentNames.isEmpty()) {
+ break;
+ }
+
+ LOGGER.info("Found no eligible segments for task: {} with window [{} - {}), moving to the next time bucket",
+ taskType, windowStartMs, windowEndMs);
+ windowStartMs = windowEndMs;
+ windowEndMs += bucketMs;
}
- if (segmentNames.isEmpty() || skipGenerate) {
- LOGGER.info("Found no eligible segments for task: {} with window [{} - {}). Skipping task generation", taskType,
- windowStartMs, windowEndMs);
+ if (skipGenerate) {
continue;
}
@@ -264,8 +268,8 @@ public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerato
if (segmentZKMetadata.getStatus().equals(Segment.Realtime.Status.DONE)) {
completedSegmentsZKMetadata.add(segmentZKMetadata);
- latestLLCSegmentNameMap
- .compute(llcSegmentName.getPartitionGroupId(), (partitionGroupId, latestLLCSegmentName) -> {
+ latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionGroupId(),
+ (partitionGroupId, latestLLCSegmentName) -> {
if (latestLLCSegmentName == null) {
return llcSegmentName;
} else {
@@ -291,11 +295,12 @@ public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerato
*/
private long getWatermarkMs(String realtimeTableName, List<SegmentZKMetadata> completedSegmentsZKMetadata,
long bucketMs) {
- ZNRecord realtimeToOfflineZNRecord = _clusterInfoAccessor
- .getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, realtimeTableName);
+ ZNRecord realtimeToOfflineZNRecord =
+ _clusterInfoAccessor.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
+ realtimeTableName);
RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata =
- realtimeToOfflineZNRecord != null ? RealtimeToOfflineSegmentsTaskMetadata
- .fromZNRecord(realtimeToOfflineZNRecord) : null;
+ realtimeToOfflineZNRecord != null ? RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(
+ realtimeToOfflineZNRecord) : null;
if (realtimeToOfflineSegmentsTaskMetadata == null) {
// No ZNode exists. Cold-start.
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
index fc71b5b..000383b 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
@@ -19,6 +19,7 @@
package org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments;
import com.google.common.collect.Lists;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -403,6 +404,37 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
assertEquals(pinotTaskConfigs.size(), 1);
}
+ /**
+ * Tests for task generation when there is time gap between segments.
+ */
+ @Test
+ public void testTimeGap() {
+ Map<String, Map<String, String>> taskConfigsMap = new HashMap<>();
+ taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, new HashMap<>());
+ TableConfig realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
+
+ ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
+ when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new HashMap<>());
+ when(mockClusterInfoProvide.getMinionTaskMetadataZNRecord(RealtimeToOfflineSegmentsTask.TASK_TYPE,
+ REALTIME_TABLE_NAME)).thenReturn(
+ new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 1590019200000L).toZNRecord()); // 21 May 2020 UTC
+ SegmentZKMetadata segmentZKMetadata =
+ getSegmentZKMetadata("testTable__0__1__12345", Status.DONE, 1590220800000L, 1590307200000L,
+ TimeUnit.MILLISECONDS, "download2"); // 05-23-2020T08:00:00 UTC to 05-24-2020T08:00:00 UTC
+ when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn(
+ Collections.singletonList(segmentZKMetadata));
+
+ RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator();
+ generator.init(mockClusterInfoProvide);
+
+ // Generated task should skip 2 days and have time window of [23 May 2020 UTC, 24 May 2020 UTC)
+ List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+ assertEquals(pinotTaskConfigs.size(), 1);
+ Map<String, String> configs = pinotTaskConfigs.get(0).getConfigs();
+ assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY), "1590192000000");
+ assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY), "1590278400000");
+ }
+
@Test
public void testBuffer() {
Map<String, Map<String, String>> taskConfigsMap = new HashMap<>();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org