You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/11/23 16:23:17 UTC

[pinot] branch master updated: Proceed to next time window when no segment match for RealtimeToOfflineTask (#7814)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 767aa8a  Proceed to next time window when no segment match for RealtimeToOfflineTask (#7814)
767aa8a is described below

commit 767aa8abfb5bf085ba0a7ae5ff4024679f27816e
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Nov 23 08:22:53 2021 -0800

    Proceed to next time window when no segment match for RealtimeToOfflineTask (#7814)
---
 .../RealtimeToOfflineSegmentsTaskGenerator.java    | 97 ++++++++++++----------
 ...RealtimeToOfflineSegmentsTaskGeneratorTest.java | 32 +++++++
 2 files changed, 83 insertions(+), 46 deletions(-)

diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
index b715e55..7b0b490 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
@@ -119,9 +119,8 @@ public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerato
       Map<String, TaskState> incompleteTasks =
           TaskGeneratorUtils.getIncompleteTasks(taskType, realtimeTableName, _clusterInfoAccessor);
       if (!incompleteTasks.isEmpty()) {
-        LOGGER
-            .warn("Found incomplete tasks: {} for same table: {}. Skipping task generation.", incompleteTasks.keySet(),
-                realtimeTableName);
+        LOGGER.warn("Found incomplete tasks: {} for same table: {}. Skipping task generation.",
+            incompleteTasks.keySet(), realtimeTableName);
         continue;
       }
 
@@ -132,16 +131,15 @@ public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerato
       getCompletedSegmentsInfo(realtimeTableName, completedSegmentsZKMetadata, partitionToLatestCompletedSegmentName,
           allPartitions);
       if (completedSegmentsZKMetadata.isEmpty()) {
-        LOGGER
-            .info("No realtime-completed segments found for table: {}, skipping task generation: {}", realtimeTableName,
-                taskType);
+        LOGGER.info("No realtime-completed segments found for table: {}, skipping task generation: {}",
+            realtimeTableName, taskType);
         continue;
       }
       allPartitions.removeAll(partitionToLatestCompletedSegmentName.keySet());
       if (!allPartitions.isEmpty()) {
-        LOGGER
-            .info("Partitions: {} have no completed segments. Table: {} is not ready for {}. Skipping task generation.",
-                allPartitions, realtimeTableName, taskType);
+        LOGGER.info(
+            "Partitions: {} have no completed segments. Table: {} is not ready for {}. Skipping task generation.",
+            allPartitions, realtimeTableName, taskType);
         continue;
       }
 
@@ -163,47 +161,53 @@ public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerato
       long windowStartMs = getWatermarkMs(realtimeTableName, completedSegmentsZKMetadata, bucketMs);
       long windowEndMs = windowStartMs + bucketMs;
 
-      // Check that execution window is older than bufferTime
-      if (windowEndMs > System.currentTimeMillis() - bufferMs) {
-        LOGGER.info(
-            "Window with start: {} and end: {} is not older than buffer time: {} configured as {} ago. Skipping task "
-                + "generation: {}",
-            windowStartMs, windowEndMs, bufferMs, bufferTimePeriod, taskType);
-        continue;
-      }
-
       // Find all COMPLETED segments with data overlapping execution window: windowStart (inclusive) to windowEnd
       // (exclusive)
       List<String> segmentNames = new ArrayList<>();
       List<String> downloadURLs = new ArrayList<>();
       Set<String> lastCompletedSegmentPerPartition = new HashSet<>(partitionToLatestCompletedSegmentName.values());
       boolean skipGenerate = false;
-      for (SegmentZKMetadata segmentZKMetadata : completedSegmentsZKMetadata) {
-        String segmentName = segmentZKMetadata.getSegmentName();
-        long segmentStartTimeMs = segmentZKMetadata.getStartTimeMs();
-        long segmentEndTimeMs = segmentZKMetadata.getEndTimeMs();
-
-        // Check overlap with window
-        if (windowStartMs <= segmentEndTimeMs && segmentStartTimeMs < windowEndMs) {
-          // If last completed segment is being used, make sure that segment crosses over end of window.
-          // In the absence of this check, CONSUMING segments could contain some portion of the window. That data
-          // would be skipped forever.
-          if (lastCompletedSegmentPerPartition.contains(segmentName) && segmentEndTimeMs < windowEndMs) {
-            LOGGER.info(
-                "Window data overflows into CONSUMING segments for partition of segment: {}. Skipping task "
-                    + "generation: {}",
-                segmentName, taskType);
-            skipGenerate = true;
-            break;
+      while (true) {
+        // Check that execution window is older than bufferTime
+        if (windowEndMs > System.currentTimeMillis() - bufferMs) {
+          LOGGER.info(
+              "Window with start: {} and end: {} is not older than buffer time: {} configured as {} ago. Skipping task "
+                  + "generation: {}", windowStartMs, windowEndMs, bufferMs, bufferTimePeriod, taskType);
+          skipGenerate = true;
+          break;
+        }
+
+        for (SegmentZKMetadata segmentZKMetadata : completedSegmentsZKMetadata) {
+          String segmentName = segmentZKMetadata.getSegmentName();
+          long segmentStartTimeMs = segmentZKMetadata.getStartTimeMs();
+          long segmentEndTimeMs = segmentZKMetadata.getEndTimeMs();
+
+          // Check overlap with window
+          if (windowStartMs <= segmentEndTimeMs && segmentStartTimeMs < windowEndMs) {
+            // If last completed segment is being used, make sure that segment crosses over end of window.
+            // In the absence of this check, CONSUMING segments could contain some portion of the window. That data
+            // would be skipped forever.
+            if (lastCompletedSegmentPerPartition.contains(segmentName) && segmentEndTimeMs < windowEndMs) {
+              LOGGER.info("Window data overflows into CONSUMING segments for partition of segment: {}. Skipping task "
+                  + "generation: {}", segmentName, taskType);
+              skipGenerate = true;
+              break;
+            }
+            segmentNames.add(segmentName);
+            downloadURLs.add(segmentZKMetadata.getDownloadUrl());
           }
-          segmentNames.add(segmentName);
-          downloadURLs.add(segmentZKMetadata.getDownloadUrl());
         }
+        if (skipGenerate || !segmentNames.isEmpty()) {
+          break;
+        }
+
+        LOGGER.info("Found no eligible segments for task: {} with window [{} - {}), moving to the next time bucket",
+            taskType, windowStartMs, windowEndMs);
+        windowStartMs = windowEndMs;
+        windowEndMs += bucketMs;
       }
 
-      if (segmentNames.isEmpty() || skipGenerate) {
-        LOGGER.info("Found no eligible segments for task: {} with window [{} - {}). Skipping task generation", taskType,
-            windowStartMs, windowEndMs);
+      if (skipGenerate) {
         continue;
       }
 
@@ -264,8 +268,8 @@ public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerato
 
       if (segmentZKMetadata.getStatus().equals(Segment.Realtime.Status.DONE)) {
         completedSegmentsZKMetadata.add(segmentZKMetadata);
-        latestLLCSegmentNameMap
-            .compute(llcSegmentName.getPartitionGroupId(), (partitionGroupId, latestLLCSegmentName) -> {
+        latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionGroupId(),
+            (partitionGroupId, latestLLCSegmentName) -> {
               if (latestLLCSegmentName == null) {
                 return llcSegmentName;
               } else {
@@ -291,11 +295,12 @@ public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerato
    */
   private long getWatermarkMs(String realtimeTableName, List<SegmentZKMetadata> completedSegmentsZKMetadata,
       long bucketMs) {
-    ZNRecord realtimeToOfflineZNRecord = _clusterInfoAccessor
-        .getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, realtimeTableName);
+    ZNRecord realtimeToOfflineZNRecord =
+        _clusterInfoAccessor.getMinionTaskMetadataZNRecord(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
+            realtimeTableName);
     RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata =
-        realtimeToOfflineZNRecord != null ? RealtimeToOfflineSegmentsTaskMetadata
-            .fromZNRecord(realtimeToOfflineZNRecord) : null;
+        realtimeToOfflineZNRecord != null ? RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(
+            realtimeToOfflineZNRecord) : null;
 
     if (realtimeToOfflineSegmentsTaskMetadata == null) {
       // No ZNode exists. Cold-start.
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
index fc71b5b..000383b 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments;
 
 import com.google.common.collect.Lists;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -403,6 +404,37 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
     assertEquals(pinotTaskConfigs.size(), 1);
   }
 
+  /**
+   * Tests for task generation when there is time gap between segments.
+   */
+  @Test
+  public void testTimeGap() {
+    Map<String, Map<String, String>> taskConfigsMap = new HashMap<>();
+    taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, new HashMap<>());
+    TableConfig realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
+
+    ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class);
+    when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new HashMap<>());
+    when(mockClusterInfoProvide.getMinionTaskMetadataZNRecord(RealtimeToOfflineSegmentsTask.TASK_TYPE,
+        REALTIME_TABLE_NAME)).thenReturn(
+        new RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 1590019200000L).toZNRecord()); // 21 May 2020 UTC
+    SegmentZKMetadata segmentZKMetadata =
+        getSegmentZKMetadata("testTable__0__1__12345", Status.DONE, 1590220800000L, 1590307200000L,
+            TimeUnit.MILLISECONDS, "download2"); // 05-23-2020T08:00:00 UTC to 05-24-2020T08:00:00 UTC
+    when(mockClusterInfoProvide.getSegmentsZKMetadata(REALTIME_TABLE_NAME)).thenReturn(
+        Collections.singletonList(segmentZKMetadata));
+
+    RealtimeToOfflineSegmentsTaskGenerator generator = new RealtimeToOfflineSegmentsTaskGenerator();
+    generator.init(mockClusterInfoProvide);
+
+    // Generated task should skip 2 days and have time window of [23 May 2020 UTC, 24 May 2020 UTC)
+    List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+    assertEquals(pinotTaskConfigs.size(), 1);
+    Map<String, String> configs = pinotTaskConfigs.get(0).getConfigs();
+    assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY), "1590192000000");
+    assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY), "1590278400000");
+  }
+
   @Test
   public void testBuffer() {
     Map<String, Map<String, String>> taskConfigsMap = new HashMap<>();

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org