You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2018/11/15 19:29:16 UTC
[incubator-druid] branch 0.13.0-incubating updated: Properly reset
total size of segmentsToCompact in NewestSegmentFirstIterator (#6622)
(#6627)
This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch 0.13.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.13.0-incubating by this push:
new f2d9123 Properly reset total size of segmentsToCompact in NewestSegmentFirstIterator (#6622) (#6627)
f2d9123 is described below
commit f2d912304244fb2938f0f9f1390f3717d5d6598d
Author: David Lim <da...@apache.org>
AuthorDate: Thu Nov 15 12:29:08 2018 -0700
Properly reset total size of segmentsToCompact in NewestSegmentFirstIterator (#6622) (#6627)
* Properly reset total size of segmentsToCompact in NewestSegmentFirstIterator
* add test
---
.../helper/NewestSegmentFirstIterator.java | 90 +++++++++++++++-------
.../helper/NewestSegmentFirstPolicyTest.java | 43 +++++++++++
2 files changed, 105 insertions(+), 28 deletions(-)
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
index f65647a..99d3d3f 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
@@ -168,7 +168,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
config
);
- if (segmentsToCompact.getSize() > 1) {
+ if (segmentsToCompact.getNumSegments() > 1) {
queue.add(new QueueEntry(segmentsToCompact.segments));
}
}
@@ -247,13 +247,12 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
final boolean keepSegmentGranularity = config.isKeepSegmentGranularity();
final long inputSegmentSize = config.getInputSegmentSizeBytes();
final int maxNumSegmentsToCompact = config.getMaxNumSegmentsToCompact();
- final List<DataSegment> segmentsToCompact = new ArrayList<>();
- long totalSegmentsToCompactBytes = 0;
+ final SegmentsToCompact segmentsToCompact = new SegmentsToCompact();
// Finds segments to compact together while iterating timeline from latest to oldest
while (compactibleTimelineObjectHolderCursor.hasNext()
- && totalSegmentsToCompactBytes < inputSegmentSize
- && segmentsToCompact.size() < maxNumSegmentsToCompact) {
+ && segmentsToCompact.getTotalSize() < inputSegmentSize
+ && segmentsToCompact.getNumSegments() < maxNumSegmentsToCompact) {
final TimelineObjectHolder<String, DataSegment> timeChunkHolder = Preconditions.checkNotNull(
compactibleTimelineObjectHolderCursor.get(),
"timelineObjectHolder"
@@ -262,22 +261,26 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
final long timeChunkSizeBytes = chunks.stream().mapToLong(chunk -> chunk.getObject().getSize()).sum();
// The segments in a holder should be added all together or not.
- if (SegmentCompactorUtil.isCompactibleSize(inputSegmentSize, totalSegmentsToCompactBytes, timeChunkSizeBytes)
- && SegmentCompactorUtil.isCompactibleNum(maxNumSegmentsToCompact, segmentsToCompact.size(), chunks.size())
- && (!keepSegmentGranularity || segmentsToCompact.size() == 0)) {
+ final boolean isCompactibleSize = SegmentCompactorUtil.isCompactibleSize(
+ inputSegmentSize,
+ segmentsToCompact.getTotalSize(),
+ timeChunkSizeBytes
+ );
+ final boolean isCompactibleNum = SegmentCompactorUtil.isCompactibleNum(
+ maxNumSegmentsToCompact,
+ segmentsToCompact.getNumSegments(),
+ chunks.size()
+ );
+ if (isCompactibleSize && isCompactibleNum && (!keepSegmentGranularity || segmentsToCompact.isEmpty())) {
chunks.forEach(chunk -> segmentsToCompact.add(chunk.getObject()));
- totalSegmentsToCompactBytes += timeChunkSizeBytes;
} else {
- if (segmentsToCompact.size() > 1) {
+ if (segmentsToCompact.getNumSegments() > 1) {
// We found some segmens to compact and cannot add more. End here.
- return new SegmentsToCompact(segmentsToCompact);
+ return segmentsToCompact;
} else {
- // (*) Discard segments found so far because we can't compact them anyway.
- final int numSegmentsToCompact = segmentsToCompact.size();
- segmentsToCompact.clear();
-
if (!SegmentCompactorUtil.isCompactibleSize(inputSegmentSize, 0, timeChunkSizeBytes)) {
final DataSegment segment = chunks.get(0).getObject();
+ segmentsToCompact.clear();
log.warn(
"shardSize[%d] for dataSource[%s] and interval[%s] is larger than inputSegmentSize[%d]."
+ " Continue to the next shard.",
@@ -288,6 +291,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
);
} else if (maxNumSegmentsToCompact < chunks.size()) {
final DataSegment segment = chunks.get(0).getObject();
+ segmentsToCompact.clear();
log.warn(
"The number of segments[%d] for dataSource[%s] and interval[%s] is larger than "
+ "numTargetCompactSegments[%d]. If you see lots of shards are being skipped due to too many "
@@ -299,18 +303,19 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
maxNumSegmentsToCompact
);
} else {
- if (numSegmentsToCompact == 1) {
+ if (segmentsToCompact.getNumSegments() == 1) {
// We found a segment which is smaller than targetCompactionSize but too large to compact with other
// segments. Skip this one.
- // Note that segmentsToCompact is already cleared at (*).
+ segmentsToCompact.clear();
chunks.forEach(chunk -> segmentsToCompact.add(chunk.getObject()));
- totalSegmentsToCompactBytes = timeChunkSizeBytes;
} else {
throw new ISE(
- "Cannot compact segments[%s]. shardBytes[%s], numSegments[%s]",
+ "Cannot compact segments[%s]. shardBytes[%s], numSegments[%s] "
+ + "with current segmentsToCompact[%s]",
chunks.stream().map(PartitionChunk::getObject).collect(Collectors.toList()),
timeChunkSizeBytes,
- chunks.size()
+ chunks.size(),
+ segmentsToCompact
);
}
}
@@ -320,11 +325,12 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
compactibleTimelineObjectHolderCursor.next();
}
- if (segmentsToCompact.size() > 1) {
- return new SegmentsToCompact(segmentsToCompact);
- } else {
- return new SegmentsToCompact(Collections.emptyList());
+ if (segmentsToCompact.getNumSegments() == 1) {
+ // Don't compact a single segment
+ segmentsToCompact.clear();
}
+
+ return segmentsToCompact;
}
/**
@@ -394,16 +400,44 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
private static class SegmentsToCompact
{
- private final List<DataSegment> segments;
+ private final List<DataSegment> segments = new ArrayList<>();
+ private long totalSize;
- private SegmentsToCompact(List<DataSegment> segments)
+ private void add(DataSegment segment)
{
- this.segments = segments;
+ segments.add(segment);
+ totalSize += segment.getSize();
+ }
+
+ private boolean isEmpty()
+ {
+ Preconditions.checkState((totalSize == 0) == segments.isEmpty());
+ return segments.isEmpty();
}
- private int getSize()
+ private int getNumSegments()
{
return segments.size();
}
+
+ private long getTotalSize()
+ {
+ return totalSize;
+ }
+
+ private void clear()
+ {
+ segments.clear();
+ totalSize = 0;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "SegmentsToCompact{" +
+ "segments=" + segments +
+ ", totalSize=" + totalSize +
+ '}';
+ }
}
}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java
index 1185820..17c43f8 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java
@@ -29,6 +29,7 @@ import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;
import org.joda.time.Period;
@@ -42,6 +43,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
@RunWith(Parameterized.class)
public class NewestSegmentFirstPolicyTest
@@ -453,6 +455,47 @@ public class NewestSegmentFirstPolicyTest
Assert.assertFalse(iterator.hasNext());
}
+ @Test
+ public void testClearSegmentsToCompactWhenSkippingSegments()
+ {
+ final long maxSizeOfSegmentsToCompact = 800000;
+ final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
+ new SegmentGenerateSpec(
+ Intervals.of("2017-12-03T00:00:00/2017-12-04T00:00:00"),
+ new Period("P1D"),
+ maxSizeOfSegmentsToCompact / 2 + 10,
+ 1
+ ),
+ new SegmentGenerateSpec(
+ Intervals.of("2017-12-02T00:00:00/2017-12-03T00:00:00"),
+ new Period("P1D"),
+ maxSizeOfSegmentsToCompact + 10, // large segment
+ 1
+ ),
+ new SegmentGenerateSpec(
+ Intervals.of("2017-12-01T00:00:00/2017-12-02T00:00:00"),
+ new Period("P1D"),
+ maxSizeOfSegmentsToCompact / 3 + 10,
+ 2
+ )
+ );
+ final CompactionSegmentIterator iterator = policy.reset(
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(maxSizeOfSegmentsToCompact, 100, new Period("P0D"))),
+ ImmutableMap.of(DATA_SOURCE, timeline)
+ );
+
+ final List<DataSegment> expectedSegmentsToCompact = timeline
+ .lookup(Intervals.of("2017-12-01/2017-12-02"))
+ .stream()
+ .flatMap(holder -> StreamSupport.stream(holder.getObject().spliterator(), false))
+ .map(PartitionChunk::getObject)
+ .collect(Collectors.toList());
+
+ Assert.assertTrue(iterator.hasNext());
+ Assert.assertEquals(expectedSegmentsToCompact, iterator.next());
+ Assert.assertFalse(iterator.hasNext());
+ }
+
private static void assertCompactSegmentIntervals(
CompactionSegmentIterator iterator,
Period segmentPeriod,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org