You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2024/02/21 21:53:54 UTC

(pinot) branch master updated: Schedule segments with higher invalidDocs first in Upsert Compaction task (#12461)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e161b786f5 Schedule segments with higher invalidDocs first in Upsert Compaction task (#12461)
e161b786f5 is described below

commit e161b786f5e4426fcfee1d788d7def46b1d8c24b
Author: Pratik Tibrewal <ti...@uber.com>
AuthorDate: Thu Feb 22 03:23:48 2024 +0530

    Schedule segments with higher invalidDocs first in Upsert Compaction task (#12461)
---
 .../UpsertCompactionTaskGenerator.java             | 18 +++++++++++++++---
 .../UpsertCompactionTaskGeneratorTest.java         | 22 ++++++++++++++++++++++
 2 files changed, 37 insertions(+), 3 deletions(-)

diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
index de24b8a5a6..76f5c7d939 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.task.TaskState;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
@@ -212,7 +213,7 @@ public class UpsertCompactionTaskGenerator extends BaseTaskGenerator {
     long invalidRecordsThresholdCount = Long.parseLong(
         taskConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_COUNT,
             String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT)));
-    List<SegmentZKMetadata> segmentsForCompaction = new ArrayList<>();
+    List<Pair<SegmentZKMetadata, Long>> segmentsForCompaction = new ArrayList<>();
     List<String> segmentsForDeletion = new ArrayList<>();
     for (ValidDocIdsMetadataInfo validDocIdsMetadata : validDocIdsMetadataInfoList) {
       long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs();
@@ -237,10 +238,21 @@ public class UpsertCompactionTaskGenerator extends BaseTaskGenerator {
         segmentsForDeletion.add(segment.getSegmentName());
       } else if (invalidRecordPercent > invalidRecordsThresholdPercent
           && totalInvalidDocs > invalidRecordsThresholdCount) {
-        segmentsForCompaction.add(segment);
+        segmentsForCompaction.add(Pair.of(segment, totalInvalidDocs));
       }
     }
-    return new SegmentSelectionResult(segmentsForCompaction, segmentsForDeletion);
+    segmentsForCompaction.sort((o1, o2) -> {
+      if (o1.getValue() > o2.getValue()) {
+        return -1;
+      } else if (o1.getValue().equals(o2.getValue())) {
+        return 0;
+      }
+      return 1;
+    });
+
+    return new SegmentSelectionResult(
+        segmentsForCompaction.stream().map(Map.Entry::getKey).collect(Collectors.toList()),
+        segmentsForDeletion);
   }
 
   @VisibleForTesting
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
index 5aacba1f93..971a288f82 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
@@ -259,6 +259,7 @@ public class UpsertCompactionTaskGeneratorTest {
         UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap,
             validDocIdsMetadataInfo);
     assertTrue(segmentSelectionResult.getSegmentsForCompaction().isEmpty());
+    assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0), _completedSegment2.getSegmentName());
 
     // test without an invalidRecordsThresholdPercent
     compactionConfigs = getCompactionConfigs("0", "10");
@@ -267,6 +268,7 @@ public class UpsertCompactionTaskGeneratorTest {
             validDocIdsMetadataInfo);
     assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
         _completedSegment.getSegmentName());
+    assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0), _completedSegment2.getSegmentName());
 
     // test without a invalidRecordsThresholdCount
     compactionConfigs = getCompactionConfigs("30", "0");
@@ -275,6 +277,7 @@ public class UpsertCompactionTaskGeneratorTest {
             validDocIdsMetadataInfo);
     assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
         _completedSegment.getSegmentName());
+    assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0), _completedSegment2.getSegmentName());
 
     // Test the case where the completedSegment from api has different crc than segment from zk metadata.
     json = "[{" + "\"totalValidDocs\" : 50," + "\"totalInvalidDocs\" : 50," + "\"segmentName\" : \""
@@ -296,6 +299,25 @@ public class UpsertCompactionTaskGeneratorTest {
     Assert.assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 1);
     assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0),
         _completedSegment2.getSegmentName());
+
+    // check if both the candidates for compaction are coming in sorted descending order
+    json = "[{" + "\"totalValidDocs\" : 50," + "\"totalInvalidDocs\" : 50," + "\"segmentName\" : \""
+        + _completedSegment.getSegmentName() + "\"," + "\"totalDocs\" : 100" + ", \"segmentCrc\": \""
+        + _completedSegment.getCrc() + "\"}," + "{" + "\"totalValidDocs\" : 10," + "\"totalInvalidDocs\" : 40,"
+        + "\"segmentName\" : \"" + _completedSegment2.getSegmentName() + "\", " + "\"segmentCrc\" : \""
+        + _completedSegment2.getCrc() + "\"," + "\"totalDocs\" : 50" + "}]";
+    validDocIdsMetadataInfo = JsonUtils.stringToObject(json, new TypeReference<ArrayList<ValidDocIdsMetadataInfo>>() {
+    });
+    compactionConfigs = getCompactionConfigs("30", "0");
+    segmentSelectionResult =
+        UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap,
+            validDocIdsMetadataInfo);
+    Assert.assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 2);
+    Assert.assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 0);
+    assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
+        _completedSegment.getSegmentName());
+    assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(1).getSegmentName(),
+        _completedSegment2.getSegmentName());
   }
 
   private Map<String, String> getCompactionConfigs(String invalidRecordsThresholdPercent,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org