You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/12/18 11:10:48 UTC

[GitHub] asdf2014 closed pull request #6738: Fix auto compaction when the firstSegment is in skipOffset

asdf2014 closed pull request #6738: Fix auto compaction when the firstSegment is in skipOffset
URL: https://github.com/apache/incubator-druid/pull/6738
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
index 99d3d3fa4fd..d114fff0d7e 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
@@ -30,6 +30,7 @@
 import org.apache.druid.timeline.TimelineObjectHolder;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.apache.druid.timeline.partition.PartitionChunk;
+import org.joda.time.DateTime;
 import org.joda.time.Interval;
 import org.joda.time.Period;
 
@@ -80,7 +81,9 @@
 
       if (config != null && !timeline.isEmpty()) {
         final Interval searchInterval = findInitialSearchInterval(timeline, config.getSkipOffsetFromLatest());
-        timelineIterators.put(dataSource, new CompactibleTimelineObjectHolderCursor(timeline, searchInterval));
+        if (searchInterval != null) {
+          timelineIterators.put(dataSource, new CompactibleTimelineObjectHolderCursor(timeline, searchInterval));
+        }
       }
     }
 
@@ -339,8 +342,9 @@ private static SegmentsToCompact findSegmentsToCompact(
    * @param timeline   timeline of a dataSource
    * @param skipOffset skipOFfset
    *
-   * @return found searchInterval
+   * @return found interval to search or null if it's not found
    */
+  @Nullable
   private static Interval findInitialSearchInterval(
       VersionedIntervalTimeline<String, DataSegment> timeline,
       Period skipOffset
@@ -354,25 +358,31 @@ private static Interval findInitialSearchInterval(
 
     final Interval skipInterval = new Interval(skipOffset, last.getInterval().getEnd());
 
-    final List<TimelineObjectHolder<String, DataSegment>> holders = timeline.lookup(
-        new Interval(first.getInterval().getStart(), last.getInterval().getEnd().minus(skipOffset))
-    );
+    final DateTime lookupStart = first.getInterval().getStart();
+    final DateTime lookupEnd = last.getInterval().getEnd().minus(skipOffset);
+    if (lookupStart.isBefore(lookupEnd)) {
+      final List<TimelineObjectHolder<String, DataSegment>> holders = timeline.lookup(
+          new Interval(lookupStart, lookupEnd)
+      );
 
-    final List<DataSegment> segments = holders
-        .stream()
-        .flatMap(holder -> StreamSupport.stream(holder.getObject().spliterator(), false))
-        .map(PartitionChunk::getObject)
-        .filter(segment -> !segment.getInterval().overlaps(skipInterval))
-        .sorted((s1, s2) -> Comparators.intervalsByStartThenEnd().compare(s1.getInterval(), s2.getInterval()))
-        .collect(Collectors.toList());
+      final List<DataSegment> segments = holders
+          .stream()
+          .flatMap(holder -> StreamSupport.stream(holder.getObject().spliterator(), false))
+          .map(PartitionChunk::getObject)
+          .filter(segment -> !segment.getInterval().overlaps(skipInterval))
+          .sorted((s1, s2) -> Comparators.intervalsByStartThenEnd().compare(s1.getInterval(), s2.getInterval()))
+          .collect(Collectors.toList());
 
-    if (segments.isEmpty()) {
-      return new Interval(first.getInterval().getStart(), first.getInterval().getStart());
+      if (segments.isEmpty()) {
+        return null;
+      } else {
+        return new Interval(
+            segments.get(0).getInterval().getStart(),
+            segments.get(segments.size() - 1).getInterval().getEnd()
+        );
+      }
     } else {
-      return new Interval(
-          segments.get(0).getInterval().getStart(),
-          segments.get(segments.size() - 1).getInterval().getEnd()
-      );
+      return null;
     }
   }
 
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java
index 17c43f88cac..5ddbaab6cd9 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java
@@ -496,6 +496,46 @@ public void testClearSegmentsToCompactWhenSkippingSegments()
     Assert.assertFalse(iterator.hasNext());
   }
 
+  @Test
+  public void testIfFirstSegmentIsInSkipOffset()
+  {
+    final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
+        new SegmentGenerateSpec(
+            Intervals.of("2017-12-02T14:00:00/2017-12-03T00:00:00"),
+            new Period("PT5H"),
+            40000,
+            1
+        )
+    );
+
+    final CompactionSegmentIterator iterator = policy.reset(
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, 100, new Period("P1D"))),
+        ImmutableMap.of(DATA_SOURCE, timeline)
+    );
+
+    Assert.assertFalse(iterator.hasNext());
+  }
+
+  @Test
+  public void testIfFirstSegmentOverlapsSkipOffset()
+  {
+    final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
+        new SegmentGenerateSpec(
+            Intervals.of("2017-12-01T23:00:00/2017-12-03T00:00:00"),
+            new Period("PT5H"),
+            40000,
+            1
+        )
+    );
+
+    final CompactionSegmentIterator iterator = policy.reset(
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, 100, new Period("P1D"))),
+        ImmutableMap.of(DATA_SOURCE, timeline)
+    );
+
+    Assert.assertFalse(iterator.hasNext());
+  }
+
   private static void assertCompactSegmentIntervals(
       CompactionSegmentIterator iterator,
       Period segmentPeriod,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org