You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2024/02/21 21:53:54 UTC
(pinot) branch master updated: Schedule segments with higher invalidDocs first in Upsert Compaction task (#12461)
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e161b786f5 Schedule segments with higher invalidDocs first in Upsert Compaction task (#12461)
e161b786f5 is described below
commit e161b786f5e4426fcfee1d788d7def46b1d8c24b
Author: Pratik Tibrewal <ti...@uber.com>
AuthorDate: Thu Feb 22 03:23:48 2024 +0530
Schedule segments with higher invalidDocs first in Upsert Compaction task (#12461)
---
.../UpsertCompactionTaskGenerator.java | 18 +++++++++++++++---
.../UpsertCompactionTaskGeneratorTest.java | 22 ++++++++++++++++++++++
2 files changed, 37 insertions(+), 3 deletions(-)
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
index de24b8a5a6..76f5c7d939 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.task.TaskState;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
@@ -212,7 +213,7 @@ public class UpsertCompactionTaskGenerator extends BaseTaskGenerator {
long invalidRecordsThresholdCount = Long.parseLong(
taskConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_COUNT,
String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT)));
- List<SegmentZKMetadata> segmentsForCompaction = new ArrayList<>();
+ List<Pair<SegmentZKMetadata, Long>> segmentsForCompaction = new ArrayList<>();
List<String> segmentsForDeletion = new ArrayList<>();
for (ValidDocIdsMetadataInfo validDocIdsMetadata : validDocIdsMetadataInfoList) {
long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs();
@@ -237,10 +238,21 @@ public class UpsertCompactionTaskGenerator extends BaseTaskGenerator {
segmentsForDeletion.add(segment.getSegmentName());
} else if (invalidRecordPercent > invalidRecordsThresholdPercent
&& totalInvalidDocs > invalidRecordsThresholdCount) {
- segmentsForCompaction.add(segment);
+ segmentsForCompaction.add(Pair.of(segment, totalInvalidDocs));
}
}
- return new SegmentSelectionResult(segmentsForCompaction, segmentsForDeletion);
+ segmentsForCompaction.sort((o1, o2) -> {
+ if (o1.getValue() > o2.getValue()) {
+ return -1;
+ } else if (o1.getValue().equals(o2.getValue())) {
+ return 0;
+ }
+ return 1;
+ });
+
+ return new SegmentSelectionResult(
+ segmentsForCompaction.stream().map(Map.Entry::getKey).collect(Collectors.toList()),
+ segmentsForDeletion);
}
@VisibleForTesting
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
index 5aacba1f93..971a288f82 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
@@ -259,6 +259,7 @@ public class UpsertCompactionTaskGeneratorTest {
UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap,
validDocIdsMetadataInfo);
assertTrue(segmentSelectionResult.getSegmentsForCompaction().isEmpty());
+ assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0), _completedSegment2.getSegmentName());
// test without an invalidRecordsThresholdPercent
compactionConfigs = getCompactionConfigs("0", "10");
@@ -267,6 +268,7 @@ public class UpsertCompactionTaskGeneratorTest {
validDocIdsMetadataInfo);
assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
_completedSegment.getSegmentName());
+ assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0), _completedSegment2.getSegmentName());
// test without a invalidRecordsThresholdCount
compactionConfigs = getCompactionConfigs("30", "0");
@@ -275,6 +277,7 @@ public class UpsertCompactionTaskGeneratorTest {
validDocIdsMetadataInfo);
assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
_completedSegment.getSegmentName());
+ assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0), _completedSegment2.getSegmentName());
// Test the case where the completedSegment from api has different crc than segment from zk metadata.
json = "[{" + "\"totalValidDocs\" : 50," + "\"totalInvalidDocs\" : 50," + "\"segmentName\" : \""
@@ -296,6 +299,25 @@ public class UpsertCompactionTaskGeneratorTest {
Assert.assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 1);
assertEquals(segmentSelectionResult.getSegmentsForDeletion().get(0),
_completedSegment2.getSegmentName());
+
+ // check if both the candidates for compaction are coming in sorted descending order
+ json = "[{" + "\"totalValidDocs\" : 50," + "\"totalInvalidDocs\" : 50," + "\"segmentName\" : \""
+ + _completedSegment.getSegmentName() + "\"," + "\"totalDocs\" : 100" + ", \"segmentCrc\": \""
+ + _completedSegment.getCrc() + "\"}," + "{" + "\"totalValidDocs\" : 10," + "\"totalInvalidDocs\" : 40,"
+ + "\"segmentName\" : \"" + _completedSegment2.getSegmentName() + "\", " + "\"segmentCrc\" : \""
+ + _completedSegment2.getCrc() + "\"," + "\"totalDocs\" : 50" + "}]";
+ validDocIdsMetadataInfo = JsonUtils.stringToObject(json, new TypeReference<ArrayList<ValidDocIdsMetadataInfo>>() {
+ });
+ compactionConfigs = getCompactionConfigs("30", "0");
+ segmentSelectionResult =
+ UpsertCompactionTaskGenerator.processValidDocIdsMetadata(compactionConfigs, _completedSegmentsMap,
+ validDocIdsMetadataInfo);
+ Assert.assertEquals(segmentSelectionResult.getSegmentsForCompaction().size(), 2);
+ Assert.assertEquals(segmentSelectionResult.getSegmentsForDeletion().size(), 0);
+ assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(0).getSegmentName(),
+ _completedSegment.getSegmentName());
+ assertEquals(segmentSelectionResult.getSegmentsForCompaction().get(1).getSegmentName(),
+ _completedSegment2.getSegmentName());
}
private Map<String, String> getCompactionConfigs(String invalidRecordsThresholdPercent,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org