You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/11/15 19:29:11 UTC

[GitHub] jihoonson closed pull request #6627: [Backport] Properly reset total size of segmentsToCompact in NewestSegmentFirstIterator

jihoonson closed pull request #6627: [Backport] Properly reset total size of segmentsToCompact in NewestSegmentFirstIterator
URL: https://github.com/apache/incubator-druid/pull/6627
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
index f65647ac3d9..99d3d3fa4fd 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
@@ -168,7 +168,7 @@ private void updateQueue(String dataSourceName, DataSourceCompactionConfig confi
         config
     );
 
-    if (segmentsToCompact.getSize() > 1) {
+    if (segmentsToCompact.getNumSegments() > 1) {
       queue.add(new QueueEntry(segmentsToCompact.segments));
     }
   }
@@ -247,13 +247,12 @@ private static SegmentsToCompact findSegmentsToCompact(
     final boolean keepSegmentGranularity = config.isKeepSegmentGranularity();
     final long inputSegmentSize = config.getInputSegmentSizeBytes();
     final int maxNumSegmentsToCompact = config.getMaxNumSegmentsToCompact();
-    final List<DataSegment> segmentsToCompact = new ArrayList<>();
-    long totalSegmentsToCompactBytes = 0;
+    final SegmentsToCompact segmentsToCompact = new SegmentsToCompact();
 
     // Finds segments to compact together while iterating timeline from latest to oldest
     while (compactibleTimelineObjectHolderCursor.hasNext()
-           && totalSegmentsToCompactBytes < inputSegmentSize
-           && segmentsToCompact.size() < maxNumSegmentsToCompact) {
+           && segmentsToCompact.getTotalSize() < inputSegmentSize
+           && segmentsToCompact.getNumSegments() < maxNumSegmentsToCompact) {
       final TimelineObjectHolder<String, DataSegment> timeChunkHolder = Preconditions.checkNotNull(
           compactibleTimelineObjectHolderCursor.get(),
           "timelineObjectHolder"
@@ -262,22 +261,26 @@ private static SegmentsToCompact findSegmentsToCompact(
       final long timeChunkSizeBytes = chunks.stream().mapToLong(chunk -> chunk.getObject().getSize()).sum();
 
       // The segments in a holder should be added all together or not.
-      if (SegmentCompactorUtil.isCompactibleSize(inputSegmentSize, totalSegmentsToCompactBytes, timeChunkSizeBytes)
-          && SegmentCompactorUtil.isCompactibleNum(maxNumSegmentsToCompact, segmentsToCompact.size(), chunks.size())
-          && (!keepSegmentGranularity || segmentsToCompact.size() == 0)) {
+      final boolean isCompactibleSize = SegmentCompactorUtil.isCompactibleSize(
+          inputSegmentSize,
+          segmentsToCompact.getTotalSize(),
+          timeChunkSizeBytes
+      );
+      final boolean isCompactibleNum = SegmentCompactorUtil.isCompactibleNum(
+          maxNumSegmentsToCompact,
+          segmentsToCompact.getNumSegments(),
+          chunks.size()
+      );
+      if (isCompactibleSize && isCompactibleNum && (!keepSegmentGranularity || segmentsToCompact.isEmpty())) {
         chunks.forEach(chunk -> segmentsToCompact.add(chunk.getObject()));
-        totalSegmentsToCompactBytes += timeChunkSizeBytes;
       } else {
-        if (segmentsToCompact.size() > 1) {
+        if (segmentsToCompact.getNumSegments() > 1) {
           // We found some segmens to compact and cannot add more. End here.
-          return new SegmentsToCompact(segmentsToCompact);
+          return segmentsToCompact;
         } else {
-          // (*) Discard segments found so far because we can't compact them anyway.
-          final int numSegmentsToCompact = segmentsToCompact.size();
-          segmentsToCompact.clear();
-
           if (!SegmentCompactorUtil.isCompactibleSize(inputSegmentSize, 0, timeChunkSizeBytes)) {
             final DataSegment segment = chunks.get(0).getObject();
+            segmentsToCompact.clear();
             log.warn(
                 "shardSize[%d] for dataSource[%s] and interval[%s] is larger than inputSegmentSize[%d]."
                 + " Continue to the next shard.",
@@ -288,6 +291,7 @@ private static SegmentsToCompact findSegmentsToCompact(
             );
           } else if (maxNumSegmentsToCompact < chunks.size()) {
             final DataSegment segment = chunks.get(0).getObject();
+            segmentsToCompact.clear();
             log.warn(
                 "The number of segments[%d] for dataSource[%s] and interval[%s] is larger than "
                 + "numTargetCompactSegments[%d]. If you see lots of shards are being skipped due to too many "
@@ -299,18 +303,19 @@ private static SegmentsToCompact findSegmentsToCompact(
                 maxNumSegmentsToCompact
             );
           } else {
-            if (numSegmentsToCompact == 1) {
+            if (segmentsToCompact.getNumSegments() == 1) {
               // We found a segment which is smaller than targetCompactionSize but too large to compact with other
               // segments. Skip this one.
-              // Note that segmentsToCompact is already cleared at (*).
+              segmentsToCompact.clear();
               chunks.forEach(chunk -> segmentsToCompact.add(chunk.getObject()));
-              totalSegmentsToCompactBytes = timeChunkSizeBytes;
             } else {
               throw new ISE(
-                  "Cannot compact segments[%s]. shardBytes[%s], numSegments[%s]",
+                  "Cannot compact segments[%s]. shardBytes[%s], numSegments[%s] "
+                  + "with current segmentsToCompact[%s]",
                   chunks.stream().map(PartitionChunk::getObject).collect(Collectors.toList()),
                   timeChunkSizeBytes,
-                  chunks.size()
+                  chunks.size(),
+                  segmentsToCompact
               );
             }
           }
@@ -320,11 +325,12 @@ private static SegmentsToCompact findSegmentsToCompact(
       compactibleTimelineObjectHolderCursor.next();
     }
 
-    if (segmentsToCompact.size() > 1) {
-      return new SegmentsToCompact(segmentsToCompact);
-    } else {
-      return new SegmentsToCompact(Collections.emptyList());
+    if (segmentsToCompact.getNumSegments() == 1) {
+      // Don't compact a single segment
+      segmentsToCompact.clear();
     }
+
+    return segmentsToCompact;
   }
 
   /**
@@ -394,16 +400,44 @@ private String getDataSource()
 
   private static class SegmentsToCompact
   {
-    private final List<DataSegment> segments;
+    private final List<DataSegment> segments = new ArrayList<>();
+    private long totalSize;
 
-    private SegmentsToCompact(List<DataSegment> segments)
+    private void add(DataSegment segment)
     {
-      this.segments = segments;
+      segments.add(segment);
+      totalSize += segment.getSize();
+    }
+
+    private boolean isEmpty()
+    {
+      Preconditions.checkState((totalSize == 0) == segments.isEmpty());
+      return segments.isEmpty();
     }
 
-    private int getSize()
+    private int getNumSegments()
     {
       return segments.size();
     }
+
+    private long getTotalSize()
+    {
+      return totalSize;
+    }
+
+    private void clear()
+    {
+      segments.clear();
+      totalSize = 0;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "SegmentsToCompact{" +
+             "segments=" + segments +
+             ", totalSize=" + totalSize +
+             '}';
+    }
   }
 }
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java
index 1185820ccc3..17c43f88cac 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java
@@ -29,6 +29,7 @@
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.apache.druid.timeline.partition.PartitionChunk;
 import org.apache.druid.timeline.partition.ShardSpec;
 import org.joda.time.Interval;
 import org.joda.time.Period;
@@ -42,6 +43,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 @RunWith(Parameterized.class)
 public class NewestSegmentFirstPolicyTest
@@ -453,6 +455,47 @@ public void testIgnoreSingleSegmentToCompact()
     Assert.assertFalse(iterator.hasNext());
   }
 
+  @Test
+  public void testClearSegmentsToCompactWhenSkippingSegments()
+  {
+    final long maxSizeOfSegmentsToCompact = 800000;
+    final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
+        new SegmentGenerateSpec(
+            Intervals.of("2017-12-03T00:00:00/2017-12-04T00:00:00"),
+            new Period("P1D"),
+            maxSizeOfSegmentsToCompact / 2 + 10,
+            1
+        ),
+        new SegmentGenerateSpec(
+            Intervals.of("2017-12-02T00:00:00/2017-12-03T00:00:00"),
+            new Period("P1D"),
+            maxSizeOfSegmentsToCompact + 10, // large segment
+            1
+        ),
+        new SegmentGenerateSpec(
+            Intervals.of("2017-12-01T00:00:00/2017-12-02T00:00:00"),
+            new Period("P1D"),
+            maxSizeOfSegmentsToCompact / 3 + 10,
+            2
+        )
+    );
+    final CompactionSegmentIterator iterator = policy.reset(
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(maxSizeOfSegmentsToCompact, 100, new Period("P0D"))),
+        ImmutableMap.of(DATA_SOURCE, timeline)
+    );
+
+    final List<DataSegment> expectedSegmentsToCompact = timeline
+        .lookup(Intervals.of("2017-12-01/2017-12-02"))
+        .stream()
+        .flatMap(holder -> StreamSupport.stream(holder.getObject().spliterator(), false))
+        .map(PartitionChunk::getObject)
+        .collect(Collectors.toList());
+
+    Assert.assertTrue(iterator.hasNext());
+    Assert.assertEquals(expectedSegmentsToCompact, iterator.next());
+    Assert.assertFalse(iterator.hasNext());
+  }
+
   private static void assertCompactSegmentIntervals(
       CompactionSegmentIterator iterator,
       Period segmentPeriod,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org