You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2019/09/10 06:11:21 UTC
[incubator-druid] branch master updated: Check
targetCompactionSizeBytes to search for candidate segments in auto
compaction (#8495)
This is an automated email from the ASF dual-hosted git repository.
fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 762f4d0 Check targetCompactionSizeBytes to search for candidate segments in auto compaction (#8495)
762f4d0 is described below
commit 762f4d0e58b0f62c4225d978ab0d49509eb3598c
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Mon Sep 9 23:11:08 2019 -0700
Check targetCompactionSizeBytes to search for candidate segments in auto compaction (#8495)
* Check targetCompactionSizeBytes to search for candidate segments in auto compaction
* fix logs
* add javadoc
* rename
---
.../helper/NewestSegmentFirstIterator.java | 197 +++++++--------------
.../coordinator/helper/SegmentCompactorUtil.java | 27 ++-
.../DruidCoordinatorSegmentCompactorTest.java | 68 +++----
3 files changed, 122 insertions(+), 170 deletions(-)
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
index 6548db6..e0eecf1 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
@@ -40,6 +40,7 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -181,9 +182,9 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
/**
* Iterates the given {@link VersionedIntervalTimeline}. Only compactible {@link TimelineObjectHolder}s are returned,
- * which means the holder always has at least one {@link DataSegment} and the total size of segments is larger than 0.
+ * which means the holder always has at least two {@link DataSegment}s.
*/
- private static class CompactibleTimelineObjectHolderCursor
+ private static class CompactibleTimelineObjectHolderCursor implements Iterator<List<DataSegment>>
{
private final List<TimelineObjectHolder<String, DataSegment>> holders;
@@ -200,7 +201,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
.filter(holder -> {
final List<PartitionChunk<DataSegment>> chunks = Lists.newArrayList(holder.getObject().iterator());
final long partitionBytes = chunks.stream().mapToLong(chunk -> chunk.getObject().getSize()).sum();
- return chunks.size() > 0
+ return chunks.size() > 1
&& partitionBytes > 0
&& interval.contains(chunks.get(0).getObject().getInterval());
})
@@ -208,32 +209,23 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
.collect(Collectors.toList());
}
- boolean hasNext()
+ @Override
+ public boolean hasNext()
{
return !holders.isEmpty();
}
- /**
- * Returns the latest holder.
- */
- @Nullable
- TimelineObjectHolder<String, DataSegment> get()
+ @Override
+ public List<DataSegment> next()
{
if (holders.isEmpty()) {
- return null;
- } else {
- return holders.get(holders.size() - 1);
- }
- }
-
- /**
- * Removes the latest holder, so that {@link #get()} returns the next one.
- */
- void next()
- {
- if (!holders.isEmpty()) {
- holders.remove(holders.size() - 1);
+ throw new NoSuchElementException();
}
+ return holders.remove(holders.size() - 1)
+ .getObject()
+ .stream()
+ .map(PartitionChunk::getObject)
+ .collect(Collectors.toList());
}
}
@@ -254,103 +246,59 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
)
{
final long inputSegmentSize = config.getInputSegmentSizeBytes();
+ final @Nullable Long targetCompactionSizeBytes = config.getTargetCompactionSizeBytes();
final int maxNumSegmentsToCompact = config.getMaxNumSegmentsToCompact();
- final SegmentsToCompact segmentsToCompact = new SegmentsToCompact();
// Finds segments to compact together while iterating timeline from latest to oldest
- while (compactibleTimelineObjectHolderCursor.hasNext()
- && segmentsToCompact.getTotalSize() < inputSegmentSize
- && segmentsToCompact.getNumSegments() < maxNumSegmentsToCompact) {
- final TimelineObjectHolder<String, DataSegment> timeChunkHolder = Preconditions.checkNotNull(
- compactibleTimelineObjectHolderCursor.get(),
- "timelineObjectHolder"
+ while (compactibleTimelineObjectHolderCursor.hasNext()) {
+ final SegmentsToCompact candidates = new SegmentsToCompact(compactibleTimelineObjectHolderCursor.next());
+ final boolean isCompactibleSize = candidates.getTotalSize() <= inputSegmentSize;
+ final boolean isCompactibleNum = candidates.getNumSegments() <= maxNumSegmentsToCompact;
+ final boolean needsCompaction = SegmentCompactorUtil.needsCompaction(
+ targetCompactionSizeBytes,
+ candidates.segments
);
- final List<PartitionChunk<DataSegment>> chunks = Lists.newArrayList(timeChunkHolder.getObject().iterator());
- final long timeChunkSizeBytes = chunks.stream().mapToLong(chunk -> chunk.getObject().getSize()).sum();
- final boolean isSameOrAbuttingInterval;
- final Interval lastInterval = segmentsToCompact.getIntervalOfLastSegment();
- if (lastInterval == null) {
- isSameOrAbuttingInterval = true;
+ if (isCompactibleSize && isCompactibleNum && needsCompaction) {
+ return candidates;
} else {
- final Interval currentInterval = chunks.get(0).getObject().getInterval();
- isSameOrAbuttingInterval = currentInterval.isEqual(lastInterval) || currentInterval.abuts(lastInterval);
- }
-
- // The segments in a holder should be added all together or not.
- final boolean isCompactibleSize = SegmentCompactorUtil.isCompactibleSize(
- inputSegmentSize,
- segmentsToCompact.getTotalSize(),
- timeChunkSizeBytes
- );
- final boolean isCompactibleNum = SegmentCompactorUtil.isCompactibleNum(
- maxNumSegmentsToCompact,
- segmentsToCompact.getNumSegments(),
- chunks.size()
- );
- if (isCompactibleSize
- && isCompactibleNum
- && isSameOrAbuttingInterval
- && segmentsToCompact.isEmpty()) {
- chunks.forEach(chunk -> segmentsToCompact.add(chunk.getObject()));
- } else {
- if (segmentsToCompact.getNumSegments() > 1) {
- // We found some segmens to compact and cannot add more. End here.
- return segmentsToCompact;
- } else {
- if (!SegmentCompactorUtil.isCompactibleSize(inputSegmentSize, 0, timeChunkSizeBytes)) {
- final DataSegment segment = chunks.get(0).getObject();
- segmentsToCompact.clear();
- log.warn(
- "shardSize[%d] for dataSource[%s] and interval[%s] is larger than inputSegmentSize[%d]."
- + " Continue to the next shard.",
- timeChunkSizeBytes,
- segment.getDataSource(),
- segment.getInterval(),
- inputSegmentSize
- );
- } else if (maxNumSegmentsToCompact < chunks.size()) {
- final DataSegment segment = chunks.get(0).getObject();
- segmentsToCompact.clear();
- log.warn(
- "The number of segments[%d] for dataSource[%s] and interval[%s] is larger than "
- + "maxNumSegmentsToCompact[%d]. If you see lots of shards are being skipped due to too many "
- + "segments, consider increasing 'numTargetCompactionSegments' and "
- + "'druid.indexer.runner.maxZnodeBytes'. Continue to the next shard.",
- chunks.size(),
- segment.getDataSource(),
- segment.getInterval(),
- maxNumSegmentsToCompact
- );
- } else {
- if (segmentsToCompact.getNumSegments() == 1) {
- // We found a segment which is smaller than targetCompactionSize but too large to compact with other
- // segments. Skip this one.
- segmentsToCompact.clear();
- chunks.forEach(chunk -> segmentsToCompact.add(chunk.getObject()));
- } else {
- throw new ISE(
- "Cannot compact segments[%s]. shardBytes[%s], numSegments[%s] "
- + "with current segmentsToCompact[%s]",
- chunks.stream().map(PartitionChunk::getObject).collect(Collectors.toList()),
- timeChunkSizeBytes,
- chunks.size(),
- segmentsToCompact
- );
- }
- }
+ if (!isCompactibleSize) {
+ log.warn(
+ "total segment size[%d] for datasource[%s] and interval[%s] is larger than inputSegmentSize[%d]."
+ + " Continue to the next interval.",
+ candidates.getTotalSize(),
+ candidates.segments.get(0).getDataSource(),
+ candidates.segments.get(0).getInterval(),
+ inputSegmentSize
+ );
+ }
+ if (!isCompactibleNum) {
+ log.warn(
+ "Number of segments[%d] for datasource[%s] and interval[%s] is larger than "
+ + "maxNumSegmentsToCompact[%d]. If you see lots of shards are being skipped due to too many "
+ + "segments, consider increasing 'numTargetCompactionSegments' and "
+ + "'druid.indexer.runner.maxZnodeBytes'. Continue to the next interval.",
+ candidates.getNumSegments(),
+ candidates.segments.get(0).getDataSource(),
+ candidates.segments.get(0).getInterval(),
+ maxNumSegmentsToCompact
+ );
+ }
+ if (!needsCompaction) {
+ log.warn(
+ "Size of most of segments[%s] is larger than targetCompactionSizeBytes[%s] "
+ + "for datasource[%s] and interval[%s]. Skipping compaction for this interval.",
+ candidates.segments.stream().map(DataSegment::getSize).collect(Collectors.toList()),
+ targetCompactionSizeBytes,
+ candidates.segments.get(0).getDataSource(),
+ candidates.segments.get(0).getInterval()
+ );
}
}
-
- compactibleTimelineObjectHolderCursor.next();
- }
-
- if (segmentsToCompact.getNumSegments() == 1) {
- // Don't compact a single segment
- segmentsToCompact.clear();
}
- return segmentsToCompact;
+ // Return an empty set if nothing is found
+ return new SegmentsToCompact();
}
/**
@@ -510,29 +458,18 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
private static class SegmentsToCompact
{
- private final List<DataSegment> segments = new ArrayList<>();
- private long totalSize;
-
- private void add(DataSegment segment)
- {
- segments.add(segment);
- totalSize += segment.getSize();
- }
+ private final List<DataSegment> segments;
+ private final long totalSize;
- private boolean isEmpty()
+ private SegmentsToCompact()
{
- Preconditions.checkState((totalSize == 0) == segments.isEmpty());
- return segments.isEmpty();
+ this(Collections.emptyList());
}
- @Nullable
- private Interval getIntervalOfLastSegment()
+ private SegmentsToCompact(List<DataSegment> segments)
{
- if (segments.isEmpty()) {
- return null;
- } else {
- return segments.get(segments.size() - 1).getInterval();
- }
+ this.segments = segments;
+ this.totalSize = segments.stream().mapToLong(DataSegment::getSize).sum();
}
private int getNumSegments()
@@ -545,12 +482,6 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
return totalSize;
}
- private void clear()
- {
- segments.clear();
- totalSize = 0;
- }
-
@Override
public String toString()
{
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/SegmentCompactorUtil.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/SegmentCompactorUtil.java
index 08a651e..d68c3a0 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/SegmentCompactorUtil.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/SegmentCompactorUtil.java
@@ -20,21 +20,36 @@
package org.apache.druid.server.coordinator.helper;
import com.google.common.base.Preconditions;
+import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
+import javax.annotation.Nullable;
+import java.util.List;
+
/**
* Util class used by {@link DruidCoordinatorSegmentCompactor} and {@link CompactionSegmentSearchPolicy}.
*/
class SegmentCompactorUtil
{
- static boolean isCompactibleSize(long targetBytes, long currentTotalBytes, long additionalBytes)
- {
- return currentTotalBytes + additionalBytes <= targetBytes;
- }
+ /**
+ * The allowed error rate of the segment size after compaction.
+ * Its value is determined experimentally.
+ */
+ private static final double ALLOWED_ERROR_OF_SEGMENT_SIZE = .2;
- static boolean isCompactibleNum(int numTargetSegments, int numCurrentSegments, int numAdditionalSegments)
+ static boolean needsCompaction(@Nullable Long targetCompactionSizeBytes, List<DataSegment> candidates)
{
- return numCurrentSegments + numAdditionalSegments <= numTargetSegments;
+ if (targetCompactionSizeBytes == null) {
+ // If targetCompactionSizeBytes is null, we have no way to check that the given segments need compaction or not.
+ return true;
+ }
+ final double minTargetThreshold = targetCompactionSizeBytes * (1 - ALLOWED_ERROR_OF_SEGMENT_SIZE);
+ final double maxTargetThreshold = targetCompactionSizeBytes * (1 + ALLOWED_ERROR_OF_SEGMENT_SIZE);
+
+ return candidates
+ .stream()
+ .filter(segment -> segment.getSize() < minTargetThreshold || segment.getSize() > maxTargetThreshold)
+ .count() > 1;
}
/**
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java
index 0eb8b39..fcc1053 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java
@@ -38,7 +38,6 @@ import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
-import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.ShardSpec;
@@ -79,18 +78,6 @@ public class DruidCoordinatorSegmentCompactorTest
segments.get(0).getInterval().getStart(),
segments.get(segments.size() - 1).getInterval().getEnd()
);
- DataSegment compactSegment = new DataSegment(
- segments.get(0).getDataSource(),
- compactInterval,
- "newVersion_" + compactVersionSuffix++,
- null,
- segments.get(0).getDimensions(),
- segments.get(0).getMetrics(),
- NoneShardSpec.instance(),
- 1,
- segments.stream().mapToLong(DataSegment::getSize).sum()
- );
-
final VersionedIntervalTimeline<String, DataSegment> timeline = dataSources.get(segments.get(0).getDataSource());
segments.forEach(
segment -> timeline.remove(
@@ -99,11 +86,28 @@ public class DruidCoordinatorSegmentCompactorTest
segment.getShardSpec().createChunk(segment)
)
);
- timeline.add(
- compactInterval,
- compactSegment.getVersion(),
- compactSegment.getShardSpec().createChunk(compactSegment)
- );
+ final String version = "newVersion_" + compactVersionSuffix++;
+ final long segmentSize = segments.stream().mapToLong(DataSegment::getSize).sum() / 2;
+ for (int i = 0; i < 2; i++) {
+ DataSegment compactSegment = new DataSegment(
+ segments.get(0).getDataSource(),
+ compactInterval,
+ version,
+ null,
+ segments.get(0).getDimensions(),
+ segments.get(0).getMetrics(),
+ new NumberedShardSpec(i, 0),
+ 1,
+ segmentSize
+ );
+
+ timeline.add(
+ compactInterval,
+ compactSegment.getVersion(),
+ compactSegment.getShardSpec().createChunk(compactSegment)
+ );
+ }
+
return "task_" + idSuffix++;
}
@@ -129,7 +133,7 @@ public class DruidCoordinatorSegmentCompactorTest
for (int i = 0; i < 3; i++) {
final String dataSource = DATA_SOURCE_PREFIX + i;
for (int j : new int[] {0, 1, 2, 3, 7, 8}) {
- for (int k = 0; k < 2; k++) {
+ for (int k = 0; k < 4; k++) {
segments.add(createSegment(dataSource, j, true, k));
segments.add(createSegment(dataSource, j, false, k));
}
@@ -187,7 +191,7 @@ public class DruidCoordinatorSegmentCompactorTest
}
};
int expectedCompactTaskCount = 1;
- int expectedRemainingSegments = 200;
+ int expectedRemainingSegments = 400;
// compact for 2017-01-08T12:00:00.000Z/2017-01-09T12:00:00.000Z
assertCompactSegments(
@@ -197,7 +201,7 @@ public class DruidCoordinatorSegmentCompactorTest
expectedCompactTaskCount,
expectedVersionSupplier
);
- expectedRemainingSegments -= 20;
+ expectedRemainingSegments -= 40;
assertCompactSegments(
compactor,
Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT00:00:00", 8, 9),
@@ -207,7 +211,7 @@ public class DruidCoordinatorSegmentCompactorTest
);
// compact for 2017-01-07T12:00:00.000Z/2017-01-08T12:00:00.000Z
- expectedRemainingSegments -= 20;
+ expectedRemainingSegments -= 40;
assertCompactSegments(
compactor,
Intervals.of("2017-01-%02dT00:00:00/2017-01-%02dT12:00:00", 8, 8),
@@ -215,7 +219,7 @@ public class DruidCoordinatorSegmentCompactorTest
expectedCompactTaskCount,
expectedVersionSupplier
);
- expectedRemainingSegments -= 20;
+ expectedRemainingSegments -= 40;
assertCompactSegments(
compactor,
Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT00:00:00", 4, 5),
@@ -225,7 +229,7 @@ public class DruidCoordinatorSegmentCompactorTest
);
for (int endDay = 4; endDay > 1; endDay -= 1) {
- expectedRemainingSegments -= 20;
+ expectedRemainingSegments -= 40;
assertCompactSegments(
compactor,
Intervals.of("2017-01-%02dT00:00:00/2017-01-%02dT12:00:00", endDay, endDay),
@@ -233,7 +237,7 @@ public class DruidCoordinatorSegmentCompactorTest
expectedCompactTaskCount,
expectedVersionSupplier
);
- expectedRemainingSegments -= 20;
+ expectedRemainingSegments -= 40;
assertCompactSegments(
compactor,
Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT00:00:00", endDay - 1, endDay),
@@ -296,10 +300,12 @@ public class DruidCoordinatorSegmentCompactorTest
List<TimelineObjectHolder<String, DataSegment>> holders = dataSources.get(dataSource).lookup(expectedInterval);
Assert.assertEquals(1, holders.size());
List<PartitionChunk<DataSegment>> chunks = Lists.newArrayList(holders.get(0).getObject());
- Assert.assertEquals(1, chunks.size());
- DataSegment segment = chunks.get(0).getObject();
- Assert.assertEquals(expectedInterval, segment.getInterval());
- Assert.assertEquals(expectedVersionSupplier.get(), segment.getVersion());
+ Assert.assertEquals(2, chunks.size());
+ final String expectedVersion = expectedVersionSupplier.get();
+ for (PartitionChunk<DataSegment> chunk : chunks) {
+ Assert.assertEquals(expectedInterval, chunk.getObject().getInterval());
+ Assert.assertEquals(expectedVersion, chunk.getObject().getVersion());
+ }
}
}
@@ -313,7 +319,7 @@ public class DruidCoordinatorSegmentCompactorTest
Assert.assertEquals(1, holders.size());
for (TimelineObjectHolder<String, DataSegment> holder : holders) {
List<PartitionChunk<DataSegment>> chunks = Lists.newArrayList(holder.getObject());
- Assert.assertEquals(2, chunks.size());
+ Assert.assertEquals(4, chunks.size());
for (PartitionChunk<DataSegment> chunk : chunks) {
DataSegment segment = chunk.getObject();
Assert.assertEquals(interval, segment.getInterval());
@@ -369,7 +375,7 @@ public class DruidCoordinatorSegmentCompactorTest
dataSource,
0,
50L,
- 50L,
+ 20L,
null,
null,
new Period("PT1H"), // smaller than segment interval
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org