You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by as...@apache.org on 2018/12/18 11:10:53 UTC
[incubator-druid] branch master updated: Fix auto compaction when
the firstSegment is in skipOffset (#6738)
This is an automated email from the ASF dual-hosted git repository.
asdf2014 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new f0ee6bf Fix auto compaction when the firstSegment is in skipOffset (#6738)
f0ee6bf is described below
commit f0ee6bf8981b0aa69d227da851274efb09cc1486
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Tue Dec 18 03:10:46 2018 -0800
Fix auto compaction when the firstSegment is in skipOffset (#6738)
* Fix auto compaction when the firstSegment is in skipOffset
* remove duplicate
---
.../helper/NewestSegmentFirstIterator.java | 46 +++++++++++++---------
.../helper/NewestSegmentFirstPolicyTest.java | 40 +++++++++++++++++++
2 files changed, 68 insertions(+), 18 deletions(-)
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
index 99d3d3f..d114fff 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
@@ -30,6 +30,7 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
+import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
@@ -80,7 +81,9 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
if (config != null && !timeline.isEmpty()) {
final Interval searchInterval = findInitialSearchInterval(timeline, config.getSkipOffsetFromLatest());
- timelineIterators.put(dataSource, new CompactibleTimelineObjectHolderCursor(timeline, searchInterval));
+ if (searchInterval != null) {
+ timelineIterators.put(dataSource, new CompactibleTimelineObjectHolderCursor(timeline, searchInterval));
+ }
}
}
@@ -339,8 +342,9 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
* @param timeline timeline of a dataSource
* @param skipOffset skipOFfset
*
- * @return found searchInterval
+ * @return found interval to search or null if it's not found
*/
+ @Nullable
private static Interval findInitialSearchInterval(
VersionedIntervalTimeline<String, DataSegment> timeline,
Period skipOffset
@@ -354,25 +358,31 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
final Interval skipInterval = new Interval(skipOffset, last.getInterval().getEnd());
- final List<TimelineObjectHolder<String, DataSegment>> holders = timeline.lookup(
- new Interval(first.getInterval().getStart(), last.getInterval().getEnd().minus(skipOffset))
- );
+ final DateTime lookupStart = first.getInterval().getStart();
+ final DateTime lookupEnd = last.getInterval().getEnd().minus(skipOffset);
+ if (lookupStart.isBefore(lookupEnd)) {
+ final List<TimelineObjectHolder<String, DataSegment>> holders = timeline.lookup(
+ new Interval(lookupStart, lookupEnd)
+ );
- final List<DataSegment> segments = holders
- .stream()
- .flatMap(holder -> StreamSupport.stream(holder.getObject().spliterator(), false))
- .map(PartitionChunk::getObject)
- .filter(segment -> !segment.getInterval().overlaps(skipInterval))
- .sorted((s1, s2) -> Comparators.intervalsByStartThenEnd().compare(s1.getInterval(), s2.getInterval()))
- .collect(Collectors.toList());
+ final List<DataSegment> segments = holders
+ .stream()
+ .flatMap(holder -> StreamSupport.stream(holder.getObject().spliterator(), false))
+ .map(PartitionChunk::getObject)
+ .filter(segment -> !segment.getInterval().overlaps(skipInterval))
+ .sorted((s1, s2) -> Comparators.intervalsByStartThenEnd().compare(s1.getInterval(), s2.getInterval()))
+ .collect(Collectors.toList());
- if (segments.isEmpty()) {
- return new Interval(first.getInterval().getStart(), first.getInterval().getStart());
+ if (segments.isEmpty()) {
+ return null;
+ } else {
+ return new Interval(
+ segments.get(0).getInterval().getStart(),
+ segments.get(segments.size() - 1).getInterval().getEnd()
+ );
+ }
} else {
- return new Interval(
- segments.get(0).getInterval().getStart(),
- segments.get(segments.size() - 1).getInterval().getEnd()
- );
+ return null;
}
}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java
index 17c43f8..5ddbaab 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java
@@ -496,6 +496,46 @@ public class NewestSegmentFirstPolicyTest
Assert.assertFalse(iterator.hasNext());
}
+ @Test
+ public void testIfFirstSegmentIsInSkipOffset()
+ {
+ final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
+ new SegmentGenerateSpec(
+ Intervals.of("2017-12-02T14:00:00/2017-12-03T00:00:00"),
+ new Period("PT5H"),
+ 40000,
+ 1
+ )
+ );
+
+ final CompactionSegmentIterator iterator = policy.reset(
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, 100, new Period("P1D"))),
+ ImmutableMap.of(DATA_SOURCE, timeline)
+ );
+
+ Assert.assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void testIfFirstSegmentOverlapsSkipOffset()
+ {
+ final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
+ new SegmentGenerateSpec(
+ Intervals.of("2017-12-01T23:00:00/2017-12-03T00:00:00"),
+ new Period("PT5H"),
+ 40000,
+ 1
+ )
+ );
+
+ final CompactionSegmentIterator iterator = policy.reset(
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, 100, new Period("P1D"))),
+ ImmutableMap.of(DATA_SOURCE, timeline)
+ );
+
+ Assert.assertFalse(iterator.hasNext());
+ }
+
private static void assertCompactSegmentIntervals(
CompactionSegmentIterator iterator,
Period segmentPeriod,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org