You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2024/02/07 17:09:12 UTC

(pinot) branch master updated: Fixing the bug for Upsert compaction task generator (#12380)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f6ae06f499 Fixing the bug for Upsert compaction task generator (#12380)
f6ae06f499 is described below

commit f6ae06f499e2fc18934b1ee605d50b7d9006e46b
Author: Seunghyun Lee <se...@startree.ai>
AuthorDate: Wed Feb 7 09:09:05 2024 -0800

    Fixing the bug for Upsert compaction task generator (#12380)
    
    - The current task generator had an issue with the null pointer
      exception when validDocIds metadata doesn't show up on the
      list of ZK metadata. Fixed the logic to properly handle
      this case.
    - Added the default value for validDocIdsType for API
    - Added unit tests
---
 .../api/resources/PinotTableRestletResource.java   |  3 +-
 .../UpsertCompactionTaskGenerator.java             | 27 ++++++++---
 .../UpsertCompactionTaskGeneratorTest.java         | 53 ++++++++++++++++++++++
 3 files changed, 76 insertions(+), 7 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index aedc6a7ef8..c0f11a412b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -963,7 +963,7 @@ public class PinotTableRestletResource {
       @ApiParam(value = "A list of segments", allowMultiple = true) @QueryParam("segmentNames")
       List<String> segmentNames,
       @ApiParam(value = "Valid doc ids type")
-      @QueryParam("validDocIdsType") ValidDocIdsType validDocIdsType) {
+      @QueryParam("validDocIdsType") @DefaultValue("SNAPSHOT") ValidDocIdsType validDocIdsType) {
     LOGGER.info("Received a request to fetch aggregate valid doc id metadata for a table {}", tableName);
     TableType tableType = Constants.validateTableType(tableTypeStr);
     if (tableType == TableType.OFFLINE) {
@@ -977,6 +977,7 @@ public class PinotTableRestletResource {
     try {
       TableMetadataReader tableMetadataReader =
           new TableMetadataReader(_executor, _connectionManager, _pinotHelixResourceManager);
+      validDocIdsType = (validDocIdsType == null) ? ValidDocIdsType.SNAPSHOT : validDocIdsType;
       JsonNode segmentsMetadataJson =
           tableMetadataReader.getAggregateValidDocIdsMetadata(tableNameWithType, segmentNames,
               validDocIdsType.toString(), _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
index 6d21d8417f..de24b8a5a6 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
@@ -60,9 +60,9 @@ public class UpsertCompactionTaskGenerator extends BaseTaskGenerator {
 
   public static class SegmentSelectionResult {
 
-    private List<SegmentZKMetadata> _segmentsForCompaction;
+    private final List<SegmentZKMetadata> _segmentsForCompaction;
 
-    private List<String> _segmentsForDeletion;
+    private final List<String> _segmentsForDeletion;
 
     SegmentSelectionResult(List<SegmentZKMetadata> segmentsForCompaction, List<String> segmentsForDeletion) {
       _segmentsForCompaction = segmentsForCompaction;
@@ -96,8 +96,17 @@ public class UpsertCompactionTaskGenerator extends BaseTaskGenerator {
       String tableNameWithType = tableConfig.getTableName();
       LOGGER.info("Start generating task configs for table: {}", tableNameWithType);
 
+      if (tableConfig.getTaskConfig() == null) {
+        LOGGER.warn("Task config is null for table: {}", tableNameWithType);
+        continue;
+      }
+
       Map<String, String> taskConfigs = tableConfig.getTaskConfig().getConfigsForTaskType(taskType);
-      List<SegmentZKMetadata> completedSegments = getCompletedSegments(tableNameWithType, taskConfigs);
+      List<SegmentZKMetadata> allSegments = _clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
+
+      // Get completed segments and filter out the segments based on the buffer time configuration
+      List<SegmentZKMetadata> completedSegments =
+          getCompletedSegments(taskConfigs, allSegments, System.currentTimeMillis());
 
       if (completedSegments.isEmpty()) {
         LOGGER.info("No completed segments were eligible for compaction for table: {}", tableNameWithType);
@@ -211,6 +220,11 @@ public class UpsertCompactionTaskGenerator extends BaseTaskGenerator {
 
       // Skip segments if the crc from zk metadata and server does not match. They may be being reloaded.
       SegmentZKMetadata segment = completedSegmentsMap.get(segmentName);
+      if (segment == null) {
+        LOGGER.warn("Segment {} is not found in the completed segments list, skipping it for compaction", segmentName);
+        continue;
+      }
+
       if (segment.getCrc() != Long.parseLong(validDocIdsMetadata.getSegmentCrc())) {
         LOGGER.warn(
             "CRC mismatch for segment: {}, skipping it for compaction (segmentZKMetadata={}, validDocIdsMetadata={})",
@@ -229,15 +243,16 @@ public class UpsertCompactionTaskGenerator extends BaseTaskGenerator {
     return new SegmentSelectionResult(segmentsForCompaction, segmentsForDeletion);
   }
 
-  private List<SegmentZKMetadata> getCompletedSegments(String tableNameWithType, Map<String, String> taskConfigs) {
+  @VisibleForTesting
+  public static List<SegmentZKMetadata> getCompletedSegments(Map<String, String> taskConfigs,
+      List<SegmentZKMetadata> allSegments, long currentTimeInMillis) {
     List<SegmentZKMetadata> completedSegments = new ArrayList<>();
     String bufferPeriod = taskConfigs.getOrDefault(UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY, DEFAULT_BUFFER_PERIOD);
     long bufferMs = TimeUtils.convertPeriodToMillis(bufferPeriod);
-    List<SegmentZKMetadata> allSegments = getSegmentsZKMetadataForTable(tableNameWithType);
     for (SegmentZKMetadata segment : allSegments) {
       CommonConstants.Segment.Realtime.Status status = segment.getStatus();
       // initial segments selection based on status and age
-      if (status.isCompleted() && (segment.getEndTimeMs() <= (System.currentTimeMillis() - bufferMs))) {
+      if (status.isCompleted() && (segment.getEndTimeMs() <= (currentTimeInMillis - bufferMs))) {
         completedSegments.add(segment);
       }
     }
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
index d74b03e815..5aacba1f93 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
@@ -181,6 +181,52 @@ public class UpsertCompactionTaskGeneratorTest {
     assertEquals(maxTasks, 10);
   }
 
+  @Test
+  public void testGetCompletedSegments() {
+    long currentTimeInMillis = System.currentTimeMillis();
+    Map<String, String> taskConfigs = new HashMap<>();
+    taskConfigs.put(UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY, "1d");
+
+    SegmentZKMetadata metadata1 = new SegmentZKMetadata("testTable");
+    metadata1.setEndTime(1694198844776L);
+    metadata1.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+    metadata1.setTimeUnit(TimeUnit.MILLISECONDS);
+    SegmentZKMetadata metadata2 = new SegmentZKMetadata("testTable");
+    metadata2.setEndTime(1699639830678L);
+    metadata2.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+    metadata2.setTimeUnit(TimeUnit.MILLISECONDS);
+
+    SegmentZKMetadata metadata3 = new SegmentZKMetadata("testTable");
+    metadata3.setEndTime(currentTimeInMillis);
+    metadata3.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+    metadata3.setTimeUnit(TimeUnit.MILLISECONDS);
+
+    List<SegmentZKMetadata> segmentZKMetadataList = new ArrayList<>();
+    segmentZKMetadataList.add(metadata1);
+    segmentZKMetadataList.add(metadata2);
+    segmentZKMetadataList.add(metadata3);
+
+    List<SegmentZKMetadata> result =
+        UpsertCompactionTaskGenerator.getCompletedSegments(taskConfigs, segmentZKMetadataList, currentTimeInMillis);
+    Assert.assertEquals(result.size(), 2);
+
+    SegmentZKMetadata metadata4 = new SegmentZKMetadata("testTable");
+    metadata4.setEndTime(currentTimeInMillis - TimeUtils.convertPeriodToMillis("2d") + 1);
+    metadata4.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
+    metadata4.setTimeUnit(TimeUnit.MILLISECONDS);
+    segmentZKMetadataList.add(metadata4);
+
+    result =
+        UpsertCompactionTaskGenerator.getCompletedSegments(taskConfigs, segmentZKMetadataList, currentTimeInMillis);
+    Assert.assertEquals(result.size(), 3);
+
+    // Check the boundary condition for buffer time period based filtering
+    taskConfigs.put(UpsertCompactionTask.BUFFER_TIME_PERIOD_KEY, "2d");
+    result =
+        UpsertCompactionTaskGenerator.getCompletedSegments(taskConfigs, segmentZKMetadataList, currentTimeInMillis);
+    Assert.assertEquals(result.size(), 2);
+  }
+
   @Test
   public void testProcessValidDocIdsMetadata()
       throws IOException {
@@ -190,10 +236,17 @@ public class UpsertCompactionTaskGeneratorTest {
         + _completedSegment.getCrc() + "\"}," + "{" + "\"totalValidDocs\" : 0," + "\"totalInvalidDocs\" : 10,"
         + "\"segmentName\" : \"" + _completedSegment2.getSegmentName() + "\", " + "\"segmentCrc\" : \""
         + _completedSegment2.getCrc() + "\"," + "\"totalDocs\" : 10" + "}]";
+
     List<ValidDocIdsMetadataInfo> validDocIdsMetadataInfo =
         JsonUtils.stringToObject(json, new TypeReference<ArrayList<ValidDocIdsMetadataInfo>>() {
         });
+
     UpsertCompactionTaskGenerator.SegmentSelectionResult segmentSelectionResult =
+        UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, new HashMap<>(),
+            validDocIdsMetadataInfo);
+    assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 0);
+
+    segmentSelectionResult =
         UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap,
             validDocIdsMetadataInfo);
     assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org