You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2018/11/15 09:00:57 UTC

[incubator-druid] branch master updated: Properly reset total size of segmentsToCompact in NewestSegmentFirstIterator (#6622)

This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 0395d55  Properly reset total size of segmentsToCompact in NewestSegmentFirstIterator (#6622)
0395d55 is described below

commit 0395d554e16c94955158ec0cbe9c364496600be5
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Thu Nov 15 01:00:51 2018 -0800

    Properly reset total size of segmentsToCompact in NewestSegmentFirstIterator (#6622)
    
    * Properly reset total size of segmentsToCompact in NewestSegmentFirstIterator
    
    * add test
---
 .../helper/NewestSegmentFirstIterator.java         | 90 +++++++++++++++-------
 .../helper/NewestSegmentFirstPolicyTest.java       | 43 +++++++++++
 2 files changed, 105 insertions(+), 28 deletions(-)

diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
index f65647a..99d3d3f 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
@@ -168,7 +168,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
         config
     );
 
-    if (segmentsToCompact.getSize() > 1) {
+    if (segmentsToCompact.getNumSegments() > 1) {
       queue.add(new QueueEntry(segmentsToCompact.segments));
     }
   }
@@ -247,13 +247,12 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
     final boolean keepSegmentGranularity = config.isKeepSegmentGranularity();
     final long inputSegmentSize = config.getInputSegmentSizeBytes();
     final int maxNumSegmentsToCompact = config.getMaxNumSegmentsToCompact();
-    final List<DataSegment> segmentsToCompact = new ArrayList<>();
-    long totalSegmentsToCompactBytes = 0;
+    final SegmentsToCompact segmentsToCompact = new SegmentsToCompact();
 
     // Finds segments to compact together while iterating timeline from latest to oldest
     while (compactibleTimelineObjectHolderCursor.hasNext()
-           && totalSegmentsToCompactBytes < inputSegmentSize
-           && segmentsToCompact.size() < maxNumSegmentsToCompact) {
+           && segmentsToCompact.getTotalSize() < inputSegmentSize
+           && segmentsToCompact.getNumSegments() < maxNumSegmentsToCompact) {
       final TimelineObjectHolder<String, DataSegment> timeChunkHolder = Preconditions.checkNotNull(
           compactibleTimelineObjectHolderCursor.get(),
           "timelineObjectHolder"
@@ -262,22 +261,26 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
       final long timeChunkSizeBytes = chunks.stream().mapToLong(chunk -> chunk.getObject().getSize()).sum();
 
       // The segments in a holder should be added all together or not.
-      if (SegmentCompactorUtil.isCompactibleSize(inputSegmentSize, totalSegmentsToCompactBytes, timeChunkSizeBytes)
-          && SegmentCompactorUtil.isCompactibleNum(maxNumSegmentsToCompact, segmentsToCompact.size(), chunks.size())
-          && (!keepSegmentGranularity || segmentsToCompact.size() == 0)) {
+      final boolean isCompactibleSize = SegmentCompactorUtil.isCompactibleSize(
+          inputSegmentSize,
+          segmentsToCompact.getTotalSize(),
+          timeChunkSizeBytes
+      );
+      final boolean isCompactibleNum = SegmentCompactorUtil.isCompactibleNum(
+          maxNumSegmentsToCompact,
+          segmentsToCompact.getNumSegments(),
+          chunks.size()
+      );
+      if (isCompactibleSize && isCompactibleNum && (!keepSegmentGranularity || segmentsToCompact.isEmpty())) {
         chunks.forEach(chunk -> segmentsToCompact.add(chunk.getObject()));
-        totalSegmentsToCompactBytes += timeChunkSizeBytes;
       } else {
-        if (segmentsToCompact.size() > 1) {
+        if (segmentsToCompact.getNumSegments() > 1) {
           // We found some segmens to compact and cannot add more. End here.
-          return new SegmentsToCompact(segmentsToCompact);
+          return segmentsToCompact;
         } else {
-          // (*) Discard segments found so far because we can't compact them anyway.
-          final int numSegmentsToCompact = segmentsToCompact.size();
-          segmentsToCompact.clear();
-
           if (!SegmentCompactorUtil.isCompactibleSize(inputSegmentSize, 0, timeChunkSizeBytes)) {
             final DataSegment segment = chunks.get(0).getObject();
+            segmentsToCompact.clear();
             log.warn(
                 "shardSize[%d] for dataSource[%s] and interval[%s] is larger than inputSegmentSize[%d]."
                 + " Continue to the next shard.",
@@ -288,6 +291,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
             );
           } else if (maxNumSegmentsToCompact < chunks.size()) {
             final DataSegment segment = chunks.get(0).getObject();
+            segmentsToCompact.clear();
             log.warn(
                 "The number of segments[%d] for dataSource[%s] and interval[%s] is larger than "
                 + "numTargetCompactSegments[%d]. If you see lots of shards are being skipped due to too many "
@@ -299,18 +303,19 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
                 maxNumSegmentsToCompact
             );
           } else {
-            if (numSegmentsToCompact == 1) {
+            if (segmentsToCompact.getNumSegments() == 1) {
               // We found a segment which is smaller than targetCompactionSize but too large to compact with other
               // segments. Skip this one.
-              // Note that segmentsToCompact is already cleared at (*).
+              segmentsToCompact.clear();
               chunks.forEach(chunk -> segmentsToCompact.add(chunk.getObject()));
-              totalSegmentsToCompactBytes = timeChunkSizeBytes;
             } else {
               throw new ISE(
-                  "Cannot compact segments[%s]. shardBytes[%s], numSegments[%s]",
+                  "Cannot compact segments[%s]. shardBytes[%s], numSegments[%s] "
+                  + "with current segmentsToCompact[%s]",
                   chunks.stream().map(PartitionChunk::getObject).collect(Collectors.toList()),
                   timeChunkSizeBytes,
-                  chunks.size()
+                  chunks.size(),
+                  segmentsToCompact
               );
             }
           }
@@ -320,11 +325,12 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
       compactibleTimelineObjectHolderCursor.next();
     }
 
-    if (segmentsToCompact.size() > 1) {
-      return new SegmentsToCompact(segmentsToCompact);
-    } else {
-      return new SegmentsToCompact(Collections.emptyList());
+    if (segmentsToCompact.getNumSegments() == 1) {
+      // Don't compact a single segment
+      segmentsToCompact.clear();
     }
+
+    return segmentsToCompact;
   }
 
   /**
@@ -394,16 +400,44 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
 
   private static class SegmentsToCompact
   {
-    private final List<DataSegment> segments;
+    private final List<DataSegment> segments = new ArrayList<>();
+    private long totalSize;
 
-    private SegmentsToCompact(List<DataSegment> segments)
+    private void add(DataSegment segment)
     {
-      this.segments = segments;
+      segments.add(segment);
+      totalSize += segment.getSize();
+    }
+
+    private boolean isEmpty()
+    {
+      Preconditions.checkState((totalSize == 0) == segments.isEmpty());
+      return segments.isEmpty();
     }
 
-    private int getSize()
+    private int getNumSegments()
     {
       return segments.size();
     }
+
+    private long getTotalSize()
+    {
+      return totalSize;
+    }
+
+    private void clear()
+    {
+      segments.clear();
+      totalSize = 0;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "SegmentsToCompact{" +
+             "segments=" + segments +
+             ", totalSize=" + totalSize +
+             '}';
+    }
   }
 }
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java
index 1185820..17c43f8 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java
@@ -29,6 +29,7 @@ import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.apache.druid.timeline.partition.PartitionChunk;
 import org.apache.druid.timeline.partition.ShardSpec;
 import org.joda.time.Interval;
 import org.joda.time.Period;
@@ -42,6 +43,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 @RunWith(Parameterized.class)
 public class NewestSegmentFirstPolicyTest
@@ -453,6 +455,47 @@ public class NewestSegmentFirstPolicyTest
     Assert.assertFalse(iterator.hasNext());
   }
 
+  @Test
+  public void testClearSegmentsToCompactWhenSkippingSegments()
+  {
+    final long maxSizeOfSegmentsToCompact = 800000;
+    final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
+        new SegmentGenerateSpec(
+            Intervals.of("2017-12-03T00:00:00/2017-12-04T00:00:00"),
+            new Period("P1D"),
+            maxSizeOfSegmentsToCompact / 2 + 10,
+            1
+        ),
+        new SegmentGenerateSpec(
+            Intervals.of("2017-12-02T00:00:00/2017-12-03T00:00:00"),
+            new Period("P1D"),
+            maxSizeOfSegmentsToCompact + 10, // large segment
+            1
+        ),
+        new SegmentGenerateSpec(
+            Intervals.of("2017-12-01T00:00:00/2017-12-02T00:00:00"),
+            new Period("P1D"),
+            maxSizeOfSegmentsToCompact / 3 + 10,
+            2
+        )
+    );
+    final CompactionSegmentIterator iterator = policy.reset(
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(maxSizeOfSegmentsToCompact, 100, new Period("P0D"))),
+        ImmutableMap.of(DATA_SOURCE, timeline)
+    );
+
+    final List<DataSegment> expectedSegmentsToCompact = timeline
+        .lookup(Intervals.of("2017-12-01/2017-12-02"))
+        .stream()
+        .flatMap(holder -> StreamSupport.stream(holder.getObject().spliterator(), false))
+        .map(PartitionChunk::getObject)
+        .collect(Collectors.toList());
+
+    Assert.assertTrue(iterator.hasNext());
+    Assert.assertEquals(expectedSegmentsToCompact, iterator.next());
+    Assert.assertFalse(iterator.hasNext());
+  }
+
   private static void assertCompactSegmentIntervals(
       CompactionSegmentIterator iterator,
       Period segmentPeriod,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org