You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2019/09/10 06:11:21 UTC

[incubator-druid] branch master updated: Check targetCompactionSizeBytes to search for candidate segments in auto compaction (#8495)

This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 762f4d0  Check targetCompactionSizeBytes to search for candidate segments in auto compaction (#8495)
762f4d0 is described below

commit 762f4d0e58b0f62c4225d978ab0d49509eb3598c
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Mon Sep 9 23:11:08 2019 -0700

    Check targetCompactionSizeBytes to search for candidate segments in auto compaction (#8495)
    
    * Check targetCompactionSizeBytes to search for candidate segments in auto compaction
    
    * fix logs
    
    * add javadoc
    
    * rename
---
 .../helper/NewestSegmentFirstIterator.java         | 197 +++++++--------------
 .../coordinator/helper/SegmentCompactorUtil.java   |  27 ++-
 .../DruidCoordinatorSegmentCompactorTest.java      |  68 +++----
 3 files changed, 122 insertions(+), 170 deletions(-)

diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
index 6548db6..e0eecf1 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
@@ -40,6 +40,7 @@ import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -181,9 +182,9 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
 
   /**
    * Iterates the given {@link VersionedIntervalTimeline}. Only compactible {@link TimelineObjectHolder}s are returned,
-   * which means the holder always has at least one {@link DataSegment} and the total size of segments is larger than 0.
+   * which means the holder always has at least two {@link DataSegment}s.
    */
-  private static class CompactibleTimelineObjectHolderCursor
+  private static class CompactibleTimelineObjectHolderCursor implements Iterator<List<DataSegment>>
   {
     private final List<TimelineObjectHolder<String, DataSegment>> holders;
 
@@ -200,7 +201,7 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
               .filter(holder -> {
                 final List<PartitionChunk<DataSegment>> chunks = Lists.newArrayList(holder.getObject().iterator());
                 final long partitionBytes = chunks.stream().mapToLong(chunk -> chunk.getObject().getSize()).sum();
-                return chunks.size() > 0
+                return chunks.size() > 1
                        && partitionBytes > 0
                        && interval.contains(chunks.get(0).getObject().getInterval());
               })
@@ -208,32 +209,23 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
           .collect(Collectors.toList());
     }
 
-    boolean hasNext()
+    @Override
+    public boolean hasNext()
     {
       return !holders.isEmpty();
     }
 
-    /**
-     * Returns the latest holder.
-     */
-    @Nullable
-    TimelineObjectHolder<String, DataSegment> get()
+    @Override
+    public List<DataSegment> next()
     {
       if (holders.isEmpty()) {
-        return null;
-      } else {
-        return holders.get(holders.size() - 1);
-      }
-    }
-
-    /**
-     * Removes the latest holder, so that {@link #get()} returns the next one.
-     */
-    void next()
-    {
-      if (!holders.isEmpty()) {
-        holders.remove(holders.size() - 1);
+        throw new NoSuchElementException();
       }
+      return holders.remove(holders.size() - 1)
+                    .getObject()
+                    .stream()
+                    .map(PartitionChunk::getObject)
+                    .collect(Collectors.toList());
     }
   }
 
@@ -254,103 +246,59 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
   )
   {
     final long inputSegmentSize = config.getInputSegmentSizeBytes();
+    final @Nullable Long targetCompactionSizeBytes = config.getTargetCompactionSizeBytes();
     final int maxNumSegmentsToCompact = config.getMaxNumSegmentsToCompact();
-    final SegmentsToCompact segmentsToCompact = new SegmentsToCompact();
 
     // Finds segments to compact together while iterating timeline from latest to oldest
-    while (compactibleTimelineObjectHolderCursor.hasNext()
-           && segmentsToCompact.getTotalSize() < inputSegmentSize
-           && segmentsToCompact.getNumSegments() < maxNumSegmentsToCompact) {
-      final TimelineObjectHolder<String, DataSegment> timeChunkHolder = Preconditions.checkNotNull(
-          compactibleTimelineObjectHolderCursor.get(),
-          "timelineObjectHolder"
+    while (compactibleTimelineObjectHolderCursor.hasNext()) {
+      final SegmentsToCompact candidates = new SegmentsToCompact(compactibleTimelineObjectHolderCursor.next());
+      final boolean isCompactibleSize = candidates.getTotalSize() <= inputSegmentSize;
+      final boolean isCompactibleNum = candidates.getNumSegments() <= maxNumSegmentsToCompact;
+      final boolean needsCompaction = SegmentCompactorUtil.needsCompaction(
+          targetCompactionSizeBytes,
+          candidates.segments
       );
-      final List<PartitionChunk<DataSegment>> chunks = Lists.newArrayList(timeChunkHolder.getObject().iterator());
-      final long timeChunkSizeBytes = chunks.stream().mapToLong(chunk -> chunk.getObject().getSize()).sum();
 
-      final boolean isSameOrAbuttingInterval;
-      final Interval lastInterval = segmentsToCompact.getIntervalOfLastSegment();
-      if (lastInterval == null) {
-        isSameOrAbuttingInterval = true;
+      if (isCompactibleSize && isCompactibleNum && needsCompaction) {
+        return candidates;
       } else {
-        final Interval currentInterval = chunks.get(0).getObject().getInterval();
-        isSameOrAbuttingInterval = currentInterval.isEqual(lastInterval) || currentInterval.abuts(lastInterval);
-      }
-
-      // The segments in a holder should be added all together or not.
-      final boolean isCompactibleSize = SegmentCompactorUtil.isCompactibleSize(
-          inputSegmentSize,
-          segmentsToCompact.getTotalSize(),
-          timeChunkSizeBytes
-      );
-      final boolean isCompactibleNum = SegmentCompactorUtil.isCompactibleNum(
-          maxNumSegmentsToCompact,
-          segmentsToCompact.getNumSegments(),
-          chunks.size()
-      );
-      if (isCompactibleSize
-          && isCompactibleNum
-          && isSameOrAbuttingInterval
-          && segmentsToCompact.isEmpty()) {
-        chunks.forEach(chunk -> segmentsToCompact.add(chunk.getObject()));
-      } else {
-        if (segmentsToCompact.getNumSegments() > 1) {
-          // We found some segmens to compact and cannot add more. End here.
-          return segmentsToCompact;
-        } else {
-          if (!SegmentCompactorUtil.isCompactibleSize(inputSegmentSize, 0, timeChunkSizeBytes)) {
-            final DataSegment segment = chunks.get(0).getObject();
-            segmentsToCompact.clear();
-            log.warn(
-                "shardSize[%d] for dataSource[%s] and interval[%s] is larger than inputSegmentSize[%d]."
-                + " Continue to the next shard.",
-                timeChunkSizeBytes,
-                segment.getDataSource(),
-                segment.getInterval(),
-                inputSegmentSize
-            );
-          } else if (maxNumSegmentsToCompact < chunks.size()) {
-            final DataSegment segment = chunks.get(0).getObject();
-            segmentsToCompact.clear();
-            log.warn(
-                "The number of segments[%d] for dataSource[%s] and interval[%s] is larger than "
-                + "maxNumSegmentsToCompact[%d]. If you see lots of shards are being skipped due to too many "
-                + "segments, consider increasing 'numTargetCompactionSegments' and "
-                + "'druid.indexer.runner.maxZnodeBytes'. Continue to the next shard.",
-                chunks.size(),
-                segment.getDataSource(),
-                segment.getInterval(),
-                maxNumSegmentsToCompact
-            );
-          } else {
-            if (segmentsToCompact.getNumSegments() == 1) {
-              // We found a segment which is smaller than targetCompactionSize but too large to compact with other
-              // segments. Skip this one.
-              segmentsToCompact.clear();
-              chunks.forEach(chunk -> segmentsToCompact.add(chunk.getObject()));
-            } else {
-              throw new ISE(
-                  "Cannot compact segments[%s]. shardBytes[%s], numSegments[%s] "
-                  + "with current segmentsToCompact[%s]",
-                  chunks.stream().map(PartitionChunk::getObject).collect(Collectors.toList()),
-                  timeChunkSizeBytes,
-                  chunks.size(),
-                  segmentsToCompact
-              );
-            }
-          }
+        if (!isCompactibleSize) {
+          log.warn(
+              "total segment size[%d] for datasource[%s] and interval[%s] is larger than inputSegmentSize[%d]."
+              + " Continue to the next interval.",
+              candidates.getTotalSize(),
+              candidates.segments.get(0).getDataSource(),
+              candidates.segments.get(0).getInterval(),
+              inputSegmentSize
+          );
+        }
+        if (!isCompactibleNum) {
+          log.warn(
+              "Number of segments[%d] for datasource[%s] and interval[%s] is larger than "
+              + "maxNumSegmentsToCompact[%d]. If you see lots of shards are being skipped due to too many "
+              + "segments, consider increasing 'numTargetCompactionSegments' and "
+              + "'druid.indexer.runner.maxZnodeBytes'. Continue to the next interval.",
+              candidates.getNumSegments(),
+              candidates.segments.get(0).getDataSource(),
+              candidates.segments.get(0).getInterval(),
+              maxNumSegmentsToCompact
+          );
+        }
+        if (!needsCompaction) {
+          log.warn(
+              "Size of most of segments[%s] is larger than targetCompactionSizeBytes[%s] "
+              + "for datasource[%s] and interval[%s]. Skipping compaction for this interval.",
+              candidates.segments.stream().map(DataSegment::getSize).collect(Collectors.toList()),
+              targetCompactionSizeBytes,
+              candidates.segments.get(0).getDataSource(),
+              candidates.segments.get(0).getInterval()
+          );
         }
       }
-
-      compactibleTimelineObjectHolderCursor.next();
-    }
-
-    if (segmentsToCompact.getNumSegments() == 1) {
-      // Don't compact a single segment
-      segmentsToCompact.clear();
     }
 
-    return segmentsToCompact;
+    // Return an empty set if nothing is found
+    return new SegmentsToCompact();
   }
 
   /**
@@ -510,29 +458,18 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
 
   private static class SegmentsToCompact
   {
-    private final List<DataSegment> segments = new ArrayList<>();
-    private long totalSize;
-
-    private void add(DataSegment segment)
-    {
-      segments.add(segment);
-      totalSize += segment.getSize();
-    }
+    private final List<DataSegment> segments;
+    private final long totalSize;
 
-    private boolean isEmpty()
+    private SegmentsToCompact()
     {
-      Preconditions.checkState((totalSize == 0) == segments.isEmpty());
-      return segments.isEmpty();
+      this(Collections.emptyList());
     }
 
-    @Nullable
-    private Interval getIntervalOfLastSegment()
+    private SegmentsToCompact(List<DataSegment> segments)
     {
-      if (segments.isEmpty()) {
-        return null;
-      } else {
-        return segments.get(segments.size() - 1).getInterval();
-      }
+      this.segments = segments;
+      this.totalSize = segments.stream().mapToLong(DataSegment::getSize).sum();
     }
 
     private int getNumSegments()
@@ -545,12 +482,6 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
       return totalSize;
     }
 
-    private void clear()
-    {
-      segments.clear();
-      totalSize = 0;
-    }
-
     @Override
     public String toString()
     {
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/SegmentCompactorUtil.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/SegmentCompactorUtil.java
index 08a651e..d68c3a0 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/SegmentCompactorUtil.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/SegmentCompactorUtil.java
@@ -20,21 +20,36 @@
 package org.apache.druid.server.coordinator.helper;
 
 import com.google.common.base.Preconditions;
+import org.apache.druid.timeline.DataSegment;
 import org.joda.time.Interval;
 
+import javax.annotation.Nullable;
+import java.util.List;
+
 /**
  * Util class used by {@link DruidCoordinatorSegmentCompactor} and {@link CompactionSegmentSearchPolicy}.
  */
 class SegmentCompactorUtil
 {
-  static boolean isCompactibleSize(long targetBytes, long currentTotalBytes, long additionalBytes)
-  {
-    return currentTotalBytes + additionalBytes <= targetBytes;
-  }
+  /**
+   * The allowed error rate of the segment size after compaction.
+   * Its value is determined experimentally.
+   */
+  private static final double ALLOWED_ERROR_OF_SEGMENT_SIZE = .2;
 
-  static boolean isCompactibleNum(int numTargetSegments, int numCurrentSegments, int numAdditionalSegments)
+  static boolean needsCompaction(@Nullable Long targetCompactionSizeBytes, List<DataSegment> candidates)
   {
-    return numCurrentSegments + numAdditionalSegments <= numTargetSegments;
+    if (targetCompactionSizeBytes == null) {
+      // If targetCompactionSizeBytes is null, we have no way to check that the given segments need compaction or not.
+      return true;
+    }
+    final double minTargetThreshold = targetCompactionSizeBytes * (1 - ALLOWED_ERROR_OF_SEGMENT_SIZE);
+    final double maxTargetThreshold = targetCompactionSizeBytes * (1 + ALLOWED_ERROR_OF_SEGMENT_SIZE);
+
+    return candidates
+        .stream()
+        .filter(segment -> segment.getSize() < minTargetThreshold || segment.getSize() > maxTargetThreshold)
+        .count() > 1;
   }
 
   /**
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java
index 0eb8b39..fcc1053 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java
@@ -38,7 +38,6 @@ import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.TimelineObjectHolder;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
-import org.apache.druid.timeline.partition.NoneShardSpec;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.apache.druid.timeline.partition.PartitionChunk;
 import org.apache.druid.timeline.partition.ShardSpec;
@@ -79,18 +78,6 @@ public class DruidCoordinatorSegmentCompactorTest
           segments.get(0).getInterval().getStart(),
           segments.get(segments.size() - 1).getInterval().getEnd()
       );
-      DataSegment compactSegment = new DataSegment(
-          segments.get(0).getDataSource(),
-          compactInterval,
-          "newVersion_" + compactVersionSuffix++,
-          null,
-          segments.get(0).getDimensions(),
-          segments.get(0).getMetrics(),
-          NoneShardSpec.instance(),
-          1,
-          segments.stream().mapToLong(DataSegment::getSize).sum()
-      );
-
       final VersionedIntervalTimeline<String, DataSegment> timeline = dataSources.get(segments.get(0).getDataSource());
       segments.forEach(
           segment -> timeline.remove(
@@ -99,11 +86,28 @@ public class DruidCoordinatorSegmentCompactorTest
               segment.getShardSpec().createChunk(segment)
           )
       );
-      timeline.add(
-          compactInterval,
-          compactSegment.getVersion(),
-          compactSegment.getShardSpec().createChunk(compactSegment)
-      );
+      final String version = "newVersion_" + compactVersionSuffix++;
+      final long segmentSize = segments.stream().mapToLong(DataSegment::getSize).sum() / 2;
+      for (int i = 0; i < 2; i++) {
+        DataSegment compactSegment = new DataSegment(
+            segments.get(0).getDataSource(),
+            compactInterval,
+            version,
+            null,
+            segments.get(0).getDimensions(),
+            segments.get(0).getMetrics(),
+            new NumberedShardSpec(i, 0),
+            1,
+            segmentSize
+        );
+
+        timeline.add(
+            compactInterval,
+            compactSegment.getVersion(),
+            compactSegment.getShardSpec().createChunk(compactSegment)
+        );
+      }
+
       return "task_" + idSuffix++;
     }
 
@@ -129,7 +133,7 @@ public class DruidCoordinatorSegmentCompactorTest
     for (int i = 0; i < 3; i++) {
       final String dataSource = DATA_SOURCE_PREFIX + i;
       for (int j : new int[] {0, 1, 2, 3, 7, 8}) {
-        for (int k = 0; k < 2; k++) {
+        for (int k = 0; k < 4; k++) {
           segments.add(createSegment(dataSource, j, true, k));
           segments.add(createSegment(dataSource, j, false, k));
         }
@@ -187,7 +191,7 @@ public class DruidCoordinatorSegmentCompactorTest
       }
     };
     int expectedCompactTaskCount = 1;
-    int expectedRemainingSegments = 200;
+    int expectedRemainingSegments = 400;
 
     // compact for 2017-01-08T12:00:00.000Z/2017-01-09T12:00:00.000Z
     assertCompactSegments(
@@ -197,7 +201,7 @@ public class DruidCoordinatorSegmentCompactorTest
         expectedCompactTaskCount,
         expectedVersionSupplier
     );
-    expectedRemainingSegments -= 20;
+    expectedRemainingSegments -= 40;
     assertCompactSegments(
         compactor,
         Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT00:00:00", 8, 9),
@@ -207,7 +211,7 @@ public class DruidCoordinatorSegmentCompactorTest
     );
 
     // compact for 2017-01-07T12:00:00.000Z/2017-01-08T12:00:00.000Z
-    expectedRemainingSegments -= 20;
+    expectedRemainingSegments -= 40;
     assertCompactSegments(
         compactor,
         Intervals.of("2017-01-%02dT00:00:00/2017-01-%02dT12:00:00", 8, 8),
@@ -215,7 +219,7 @@ public class DruidCoordinatorSegmentCompactorTest
         expectedCompactTaskCount,
         expectedVersionSupplier
     );
-    expectedRemainingSegments -= 20;
+    expectedRemainingSegments -= 40;
     assertCompactSegments(
         compactor,
         Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT00:00:00", 4, 5),
@@ -225,7 +229,7 @@ public class DruidCoordinatorSegmentCompactorTest
     );
 
     for (int endDay = 4; endDay > 1; endDay -= 1) {
-      expectedRemainingSegments -= 20;
+      expectedRemainingSegments -= 40;
       assertCompactSegments(
           compactor,
           Intervals.of("2017-01-%02dT00:00:00/2017-01-%02dT12:00:00", endDay, endDay),
@@ -233,7 +237,7 @@ public class DruidCoordinatorSegmentCompactorTest
           expectedCompactTaskCount,
           expectedVersionSupplier
       );
-      expectedRemainingSegments -= 20;
+      expectedRemainingSegments -= 40;
       assertCompactSegments(
           compactor,
           Intervals.of("2017-01-%02dT12:00:00/2017-01-%02dT00:00:00", endDay - 1, endDay),
@@ -296,10 +300,12 @@ public class DruidCoordinatorSegmentCompactorTest
       List<TimelineObjectHolder<String, DataSegment>> holders = dataSources.get(dataSource).lookup(expectedInterval);
       Assert.assertEquals(1, holders.size());
       List<PartitionChunk<DataSegment>> chunks = Lists.newArrayList(holders.get(0).getObject());
-      Assert.assertEquals(1, chunks.size());
-      DataSegment segment = chunks.get(0).getObject();
-      Assert.assertEquals(expectedInterval, segment.getInterval());
-      Assert.assertEquals(expectedVersionSupplier.get(), segment.getVersion());
+      Assert.assertEquals(2, chunks.size());
+      final String expectedVersion = expectedVersionSupplier.get();
+      for (PartitionChunk<DataSegment> chunk : chunks) {
+        Assert.assertEquals(expectedInterval, chunk.getObject().getInterval());
+        Assert.assertEquals(expectedVersion, chunk.getObject().getVersion());
+      }
     }
   }
 
@@ -313,7 +319,7 @@ public class DruidCoordinatorSegmentCompactorTest
       Assert.assertEquals(1, holders.size());
       for (TimelineObjectHolder<String, DataSegment> holder : holders) {
         List<PartitionChunk<DataSegment>> chunks = Lists.newArrayList(holder.getObject());
-        Assert.assertEquals(2, chunks.size());
+        Assert.assertEquals(4, chunks.size());
         for (PartitionChunk<DataSegment> chunk : chunks) {
           DataSegment segment = chunk.getObject();
           Assert.assertEquals(interval, segment.getInterval());
@@ -369,7 +375,7 @@ public class DruidCoordinatorSegmentCompactorTest
               dataSource,
               0,
               50L,
-              50L,
+              20L,
               null,
               null,
               new Period("PT1H"), // smaller than segment interval


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org