You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by as...@apache.org on 2018/12/18 11:10:53 UTC

[incubator-druid] branch master updated: Fix auto compaction when the firstSegment is in skipOffset (#6738)

This is an automated email from the ASF dual-hosted git repository.

asdf2014 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f0ee6bf  Fix auto compaction when the firstSegment is in skipOffset (#6738)
f0ee6bf is described below

commit f0ee6bf8981b0aa69d227da851274efb09cc1486
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Tue Dec 18 03:10:46 2018 -0800

    Fix auto compaction when the firstSegment is in skipOffset (#6738)
    
    * Fix auto compaction when the firstSegment is in skipOffset
    
    * remove duplicate
---
 .../helper/NewestSegmentFirstIterator.java         | 46 +++++++++++++---------
 .../helper/NewestSegmentFirstPolicyTest.java       | 40 +++++++++++++++++++
 2 files changed, 68 insertions(+), 18 deletions(-)

diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
index 99d3d3f..d114fff 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
@@ -30,6 +30,7 @@ import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.TimelineObjectHolder;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.apache.druid.timeline.partition.PartitionChunk;
+import org.joda.time.DateTime;
 import org.joda.time.Interval;
 import org.joda.time.Period;
 
@@ -80,7 +81,9 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
 
       if (config != null && !timeline.isEmpty()) {
         final Interval searchInterval = findInitialSearchInterval(timeline, config.getSkipOffsetFromLatest());
-        timelineIterators.put(dataSource, new CompactibleTimelineObjectHolderCursor(timeline, searchInterval));
+        if (searchInterval != null) {
+          timelineIterators.put(dataSource, new CompactibleTimelineObjectHolderCursor(timeline, searchInterval));
+        }
       }
     }
 
@@ -339,8 +342,9 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
    * @param timeline   timeline of a dataSource
    * @param skipOffset skipOFfset
    *
-   * @return found searchInterval
+   * @return found interval to search or null if it's not found
    */
+  @Nullable
   private static Interval findInitialSearchInterval(
       VersionedIntervalTimeline<String, DataSegment> timeline,
       Period skipOffset
@@ -354,25 +358,31 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
 
     final Interval skipInterval = new Interval(skipOffset, last.getInterval().getEnd());
 
-    final List<TimelineObjectHolder<String, DataSegment>> holders = timeline.lookup(
-        new Interval(first.getInterval().getStart(), last.getInterval().getEnd().minus(skipOffset))
-    );
+    final DateTime lookupStart = first.getInterval().getStart();
+    final DateTime lookupEnd = last.getInterval().getEnd().minus(skipOffset);
+    if (lookupStart.isBefore(lookupEnd)) {
+      final List<TimelineObjectHolder<String, DataSegment>> holders = timeline.lookup(
+          new Interval(lookupStart, lookupEnd)
+      );
 
-    final List<DataSegment> segments = holders
-        .stream()
-        .flatMap(holder -> StreamSupport.stream(holder.getObject().spliterator(), false))
-        .map(PartitionChunk::getObject)
-        .filter(segment -> !segment.getInterval().overlaps(skipInterval))
-        .sorted((s1, s2) -> Comparators.intervalsByStartThenEnd().compare(s1.getInterval(), s2.getInterval()))
-        .collect(Collectors.toList());
+      final List<DataSegment> segments = holders
+          .stream()
+          .flatMap(holder -> StreamSupport.stream(holder.getObject().spliterator(), false))
+          .map(PartitionChunk::getObject)
+          .filter(segment -> !segment.getInterval().overlaps(skipInterval))
+          .sorted((s1, s2) -> Comparators.intervalsByStartThenEnd().compare(s1.getInterval(), s2.getInterval()))
+          .collect(Collectors.toList());
 
-    if (segments.isEmpty()) {
-      return new Interval(first.getInterval().getStart(), first.getInterval().getStart());
+      if (segments.isEmpty()) {
+        return null;
+      } else {
+        return new Interval(
+            segments.get(0).getInterval().getStart(),
+            segments.get(segments.size() - 1).getInterval().getEnd()
+        );
+      }
     } else {
-      return new Interval(
-          segments.get(0).getInterval().getStart(),
-          segments.get(segments.size() - 1).getInterval().getEnd()
-      );
+      return null;
     }
   }
 
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java
index 17c43f8..5ddbaab 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java
@@ -496,6 +496,46 @@ public class NewestSegmentFirstPolicyTest
     Assert.assertFalse(iterator.hasNext());
   }
 
+  @Test
+  public void testIfFirstSegmentIsInSkipOffset()
+  {
+    final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
+        new SegmentGenerateSpec(
+            Intervals.of("2017-12-02T14:00:00/2017-12-03T00:00:00"),
+            new Period("PT5H"),
+            40000,
+            1
+        )
+    );
+
+    final CompactionSegmentIterator iterator = policy.reset(
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, 100, new Period("P1D"))),
+        ImmutableMap.of(DATA_SOURCE, timeline)
+    );
+
+    Assert.assertFalse(iterator.hasNext());
+  }
+
+  @Test
+  public void testIfFirstSegmentOverlapsSkipOffset()
+  {
+    final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
+        new SegmentGenerateSpec(
+            Intervals.of("2017-12-01T23:00:00/2017-12-03T00:00:00"),
+            new Period("PT5H"),
+            40000,
+            1
+        )
+    );
+
+    final CompactionSegmentIterator iterator = policy.reset(
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, 100, new Period("P1D"))),
+        ImmutableMap.of(DATA_SOURCE, timeline)
+    );
+
+    Assert.assertFalse(iterator.hasNext());
+  }
+
   private static void assertCompactSegmentIntervals(
       CompactionSegmentIterator iterator,
       Period segmentPeriod,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org