You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2018/12/28 02:03:52 UTC

[incubator-druid] branch master updated: Fix auto compaction to consider intervals of running tasks (#6767)

This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new fa7cb90  Fix auto compaction to consider intervals of running tasks (#6767)
fa7cb90 is described below

commit fa7cb906e40021549648848caada38cf80338b7b
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Thu Dec 27 18:03:44 2018 -0800

    Fix auto compaction to consider intervals of running tasks (#6767)
    
    * Fix auto compaction to consider intervals of running tasks
    
    * adjust initial collection size
---
 .../NewestSegmentFirstPolicyBenchmark.java         |   3 +-
 .../druid/client/indexing/ClientAppendQuery.java   |   4 +-
 .../druid/client/indexing/ClientCompactQuery.java  |   4 +-
 .../druid/client/indexing/ClientKillQuery.java     |   4 +-
 .../druid/client/indexing/ClientMergeQuery.java    |   4 +-
 .../{ClientKillQuery.java => ClientQuery.java}     |  47 ++-----
 .../client/indexing/HttpIndexingServiceClient.java |  20 +++
 .../client/indexing/IndexingServiceClient.java     |   3 +
 ...ientKillQuery.java => TaskPayloadResponse.java} |  36 ++---
 .../helper/CompactionSegmentSearchPolicy.java      |   5 +-
 .../helper/DruidCoordinatorSegmentCompactor.java   |  44 ++++--
 .../helper/NewestSegmentFirstIterator.java         | 153 ++++++++++++++++-----
 .../helper/NewestSegmentFirstPolicy.java           |   7 +-
 .../client/indexing/NoopIndexingServiceClient.java |   6 +
 .../helper/NewestSegmentFirstIteratorTest.java     |  76 ++++++++++
 .../helper/NewestSegmentFirstPolicyTest.java       |  76 ++++++++--
 16 files changed, 381 insertions(+), 111 deletions(-)

diff --git a/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java b/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
index 760c0a5..658a331 100644
--- a/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
+++ b/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
@@ -42,6 +42,7 @@ import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.annotations.State;
 import org.openjdk.jmh.infra.Blackhole;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -137,7 +138,7 @@ public class NewestSegmentFirstPolicyBenchmark
   @Benchmark
   public void measureNewestSegmentFirstPolicy(Blackhole blackhole)
   {
-    final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources);
+    final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources, Collections.emptyMap());
     for (int i = 0; i < numCompactionTaskSlots && iterator.hasNext(); i++) {
       final List<DataSegment> segments = iterator.next();
       blackhole.consume(segments);
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientAppendQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientAppendQuery.java
index 4271200..c5497b8 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientAppendQuery.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientAppendQuery.java
@@ -27,7 +27,7 @@ import java.util.List;
 
 /**
  */
-public class ClientAppendQuery
+public class ClientAppendQuery implements ClientQuery
 {
   private final String dataSource;
   private final List<DataSegment> segments;
@@ -43,12 +43,14 @@ public class ClientAppendQuery
   }
 
   @JsonProperty
+  @Override
   public String getType()
   {
     return "append";
   }
 
   @JsonProperty
+  @Override
   public String getDataSource()
   {
     return dataSource;
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java
index 1fe2b04..eec9004 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java
@@ -26,7 +26,7 @@ import org.apache.druid.timeline.DataSegment;
 import java.util.List;
 import java.util.Map;
 
-public class ClientCompactQuery
+public class ClientCompactQuery implements ClientQuery
 {
   private final String dataSource;
   private final List<DataSegment> segments;
@@ -54,12 +54,14 @@ public class ClientCompactQuery
   }
 
   @JsonProperty
+  @Override
   public String getType()
   {
     return "compact";
   }
 
   @JsonProperty
+  @Override
   public String getDataSource()
   {
     return dataSource;
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientKillQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientKillQuery.java
index 4d97246..06d88f9 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientKillQuery.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientKillQuery.java
@@ -25,7 +25,7 @@ import org.joda.time.Interval;
 
 /**
  */
-public class ClientKillQuery
+public class ClientKillQuery implements ClientQuery
 {
   private final String dataSource;
   private final Interval interval;
@@ -41,12 +41,14 @@ public class ClientKillQuery
   }
 
   @JsonProperty
+  @Override
   public String getType()
   {
     return "kill";
   }
 
   @JsonProperty
+  @Override
   public String getDataSource()
   {
     return dataSource;
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientMergeQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientMergeQuery.java
index 3800953..b61c6e6 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientMergeQuery.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientMergeQuery.java
@@ -28,7 +28,7 @@ import java.util.List;
 
 /**
  */
-public class ClientMergeQuery
+public class ClientMergeQuery implements ClientQuery
 {
   private final String dataSource;
   private final List<DataSegment> segments;
@@ -48,12 +48,14 @@ public class ClientMergeQuery
   }
 
   @JsonProperty
+  @Override
   public String getType()
   {
     return "merge";
   }
 
   @JsonProperty
+  @Override
   public String getDataSource()
   {
     return dataSource;
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientKillQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientQuery.java
similarity index 55%
copy from server/src/main/java/org/apache/druid/client/indexing/ClientKillQuery.java
copy to server/src/main/java/org/apache/druid/client/indexing/ClientQuery.java
index 4d97246..6dbd631 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientKillQuery.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientQuery.java
@@ -19,42 +19,23 @@
 
 package org.apache.druid.client.indexing;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.joda.time.Interval;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
 
 /**
+ * org.apache.druid.indexing.common.task.Task representation for clients
  */
-public class ClientKillQuery
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes(value = {
+    @Type(name = "append", value = ClientAppendQuery.class),
+    @Type(name = "merge", value = ClientMergeQuery.class),
+    @Type(name = "kill", value = ClientKillQuery.class),
+    @Type(name = "compact", value = ClientCompactQuery.class)
+})
+public interface ClientQuery
 {
-  private final String dataSource;
-  private final Interval interval;
+  String getType();
 
-  @JsonCreator
-  public ClientKillQuery(
-      @JsonProperty("dataSource") String dataSource,
-      @JsonProperty("interval") Interval interval
-  )
-  {
-    this.dataSource = dataSource;
-    this.interval = interval;
-  }
-
-  @JsonProperty
-  public String getType()
-  {
-    return "kill";
-  }
-
-  @JsonProperty
-  public String getDataSource()
-  {
-    return dataSource;
-  }
-
-  @JsonProperty
-  public Interval getInterval()
-  {
-    return interval;
-  }
+  String getDataSource();
 }
diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
index 09a4753..b8960c0 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
@@ -279,6 +279,26 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
   }
 
   @Override
+  public TaskPayloadResponse getTaskPayload(String taskId)
+  {
+    try {
+      final FullResponseHolder responseHolder = druidLeaderClient.go(
+          druidLeaderClient.makeRequest(HttpMethod.GET, StringUtils.format("/druid/indexer/v1/task/%s", taskId))
+      );
+
+      return jsonMapper.readValue(
+          responseHolder.getContent(),
+          new TypeReference<TaskPayloadResponse>()
+          {
+          }
+      );
+    }
+    catch (IOException | InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
   public int killPendingSegments(String dataSource, DateTime end)
   {
     final String endPoint = StringUtils.format(
diff --git a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
index d9d35f5..8a54bb4 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
@@ -61,4 +61,7 @@ public interface IndexingServiceClient
 
   @Nullable
   TaskStatusPlus getLastCompleteTask();
+
+  @Nullable
+  TaskPayloadResponse getTaskPayload(String taskId);
 }
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientKillQuery.java b/server/src/main/java/org/apache/druid/client/indexing/TaskPayloadResponse.java
similarity index 65%
copy from server/src/main/java/org/apache/druid/client/indexing/ClientKillQuery.java
copy to server/src/main/java/org/apache/druid/client/indexing/TaskPayloadResponse.java
index 4d97246..1b938af 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientKillQuery.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/TaskPayloadResponse.java
@@ -21,40 +21,40 @@ package org.apache.druid.client.indexing;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.joda.time.Interval;
 
-/**
- */
-public class ClientKillQuery
+public class TaskPayloadResponse
 {
-  private final String dataSource;
-  private final Interval interval;
+  private final String task;
+  private final ClientQuery payload;
 
   @JsonCreator
-  public ClientKillQuery(
-      @JsonProperty("dataSource") String dataSource,
-      @JsonProperty("interval") Interval interval
+  public TaskPayloadResponse(
+      @JsonProperty("task") final String task,
+      @JsonProperty("payload") final ClientQuery payload
   )
   {
-    this.dataSource = dataSource;
-    this.interval = interval;
+    this.task = task;
+    this.payload = payload;
   }
 
   @JsonProperty
-  public String getType()
+  public String getTask()
   {
-    return "kill";
+    return task;
   }
 
   @JsonProperty
-  public String getDataSource()
+  public ClientQuery getPayload()
   {
-    return dataSource;
+    return payload;
   }
 
-  @JsonProperty
-  public Interval getInterval()
+  @Override
+  public String toString()
   {
-    return interval;
+    return "TaskPayloadResponse{" +
+           "task='" + task + '\'' +
+           ", payload=" + payload +
+           '}';
   }
 }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/CompactionSegmentSearchPolicy.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/CompactionSegmentSearchPolicy.java
index db86163..fddec4b 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/CompactionSegmentSearchPolicy.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/CompactionSegmentSearchPolicy.java
@@ -22,7 +22,9 @@ package org.apache.druid.server.coordinator.helper;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
+import org.joda.time.Interval;
 
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -35,6 +37,7 @@ public interface CompactionSegmentSearchPolicy
    */
   CompactionSegmentIterator reset(
       Map<String, DataSourceCompactionConfig> compactionConfigs,
-      Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources
+      Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources,
+      Map<String, List<Interval>> skipIntervals
   );
 }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
index 82be5c4..ec189c0 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
@@ -21,9 +21,13 @@ package org.apache.druid.server.coordinator.helper;
 
 import com.google.inject.Inject;
 import it.unimi.dsi.fastutil.objects.Object2LongMap;
+import org.apache.druid.client.indexing.ClientCompactQuery;
 import org.apache.druid.client.indexing.IndexingServiceClient;
+import org.apache.druid.client.indexing.TaskPayloadResponse;
 import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
 import org.apache.druid.server.coordinator.CoordinatorStats;
@@ -32,10 +36,12 @@ import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.DataSegmentUtils;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
+import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
@@ -77,23 +83,46 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
         Map<String, DataSourceCompactionConfig> compactionConfigs = compactionConfigList
             .stream()
             .collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity()));
-        final int numNonCompleteCompactionTasks = findNumNonCompleteCompactTasks(
+        final List<TaskStatusPlus> compactTasks = filterNonCompactTasks(
             indexingServiceClient.getRunningTasks(),
             indexingServiceClient.getPendingTasks(),
             indexingServiceClient.getWaitingTasks()
         );
-        final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources);
+        // dataSource -> list of intervals of compact tasks
+        final Map<String, List<Interval>> compactTaskIntervals = new HashMap<>(compactionConfigList.size());
+        for (TaskStatusPlus status : compactTasks) {
+          final TaskPayloadResponse response = indexingServiceClient.getTaskPayload(status.getId());
+          if (response == null) {
+            throw new ISE("WTH? got a null paylord from overlord for task[%s]", status.getId());
+          }
+          if (COMPACT_TASK_TYPE.equals(response.getPayload().getType())) {
+            final ClientCompactQuery compactQuery = (ClientCompactQuery) response.getPayload();
+            final Interval interval = JodaUtils.umbrellaInterval(
+                compactQuery.getSegments()
+                            .stream()
+                            .map(DataSegment::getInterval)
+                            .sorted(Comparators.intervalsByStartThenEnd())
+                            .collect(Collectors.toList())
+            );
+            compactTaskIntervals.computeIfAbsent(status.getDataSource(), k -> new ArrayList<>()).add(interval);
+          } else {
+            throw new ISE("WTH? task[%s] is not a compactTask?", status.getId());
+          }
+        }
+
+        final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources, compactTaskIntervals);
 
         final int compactionTaskCapacity = (int) Math.min(
             indexingServiceClient.getTotalWorkerCapacity() * dynamicConfig.getCompactionTaskSlotRatio(),
             dynamicConfig.getMaxCompactionTaskSlots()
         );
-        final int numAvailableCompactionTaskSlots = numNonCompleteCompactionTasks > 0 ?
-                                                    compactionTaskCapacity - numNonCompleteCompactionTasks :
+        final int numNonCompleteCompactionTasks = compactTasks.size();
+        final int numAvailableCompactionTaskSlots = numNonCompleteCompactionTasks > 0
+                                                    ? Math.max(0, compactionTaskCapacity - numNonCompleteCompactionTasks)
                                                     // compactionTaskCapacity might be 0 if totalWorkerCapacity is low.
                                                     // This guarantees that at least one slot is available if
                                                     // compaction is enabled and numRunningCompactTasks is 0.
-                                                    Math.max(1, compactionTaskCapacity);
+                                                    : Math.max(1, compactionTaskCapacity);
         LOG.info(
             "Found [%d] available task slots for compaction out of [%d] max compaction task capacity",
             numAvailableCompactionTaskSlots,
@@ -117,7 +146,7 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
   }
 
   @SafeVarargs
-  private static int findNumNonCompleteCompactTasks(List<TaskStatusPlus>...taskStatusStreams)
+  private static List<TaskStatusPlus> filterNonCompactTasks(List<TaskStatusPlus>...taskStatusStreams)
   {
     final List<TaskStatusPlus> allTaskStatusPlus = new ArrayList<>();
     Arrays.stream(taskStatusStreams).forEach(allTaskStatusPlus::addAll);
@@ -132,8 +161,7 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
           // performance.
           return taskType == null || COMPACT_TASK_TYPE.equals(taskType);
         })
-        .collect(Collectors.toList())
-        .size();
+        .collect(Collectors.toList());
   }
 
   private CoordinatorStats doRun(
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
index d114fff..8c0d80d 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
@@ -19,10 +19,12 @@
 
 package org.apache.druid.server.coordinator.helper;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.JodaUtils;
 import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@@ -67,7 +69,8 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
 
   NewestSegmentFirstIterator(
       Map<String, DataSourceCompactionConfig> compactionConfigs,
-      Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources
+      Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources,
+      Map<String, List<Interval>> skipIntervals
   )
   {
     this.compactionConfigs = compactionConfigs;
@@ -80,9 +83,9 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
       final DataSourceCompactionConfig config = compactionConfigs.get(dataSource);
 
       if (config != null && !timeline.isEmpty()) {
-        final Interval searchInterval = findInitialSearchInterval(timeline, config.getSkipOffsetFromLatest());
-        if (searchInterval != null) {
-          timelineIterators.put(dataSource, new CompactibleTimelineObjectHolderCursor(timeline, searchInterval));
+        final List<Interval> searchIntervals = findInitialSearchInterval(timeline, config.getSkipOffsetFromLatest(), skipIntervals.get(dataSource));
+        if (!searchIntervals.isEmpty()) {
+          timelineIterators.put(dataSource, new CompactibleTimelineObjectHolderCursor(timeline, searchIntervals));
         }
       }
     }
@@ -186,19 +189,22 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
 
     CompactibleTimelineObjectHolderCursor(
         VersionedIntervalTimeline<String, DataSegment> timeline,
-        Interval totalIntervalToSearch
+        List<Interval> totalIntervalsToSearch
     )
     {
-      this.holders = timeline
-          .lookup(totalIntervalToSearch)
+      this.holders = totalIntervalsToSearch
           .stream()
-          .filter(holder -> {
-            final List<PartitionChunk<DataSegment>> chunks = Lists.newArrayList(holder.getObject().iterator());
-            final long partitionBytes = chunks.stream().mapToLong(chunk -> chunk.getObject().getSize()).sum();
-            return chunks.size() > 0
-                   && partitionBytes > 0
-                   && totalIntervalToSearch.contains(chunks.get(0).getObject().getInterval());
-          })
+          .flatMap(interval -> timeline
+              .lookup(interval)
+              .stream()
+              .filter(holder -> {
+                final List<PartitionChunk<DataSegment>> chunks = Lists.newArrayList(holder.getObject().iterator());
+                final long partitionBytes = chunks.stream().mapToLong(chunk -> chunk.getObject().getSize()).sum();
+                return chunks.size() > 0
+                       && partitionBytes > 0
+                       && interval.contains(chunks.get(0).getObject().getInterval());
+              })
+          )
           .collect(Collectors.toList());
     }
 
@@ -339,15 +345,15 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
   /**
    * Returns the initial searchInterval which is {@code (timeline.first().start, timeline.last().end - skipOffset)}.
    *
-   * @param timeline   timeline of a dataSource
-   * @param skipOffset skipOFfset
+   * @param timeline      timeline of a dataSource
+   * @param skipIntervals intervals to skip
    *
    * @return found interval to search or null if it's not found
    */
-  @Nullable
-  private static Interval findInitialSearchInterval(
+  private static List<Interval> findInitialSearchInterval(
       VersionedIntervalTimeline<String, DataSegment> timeline,
-      Period skipOffset
+      Period skipOffset,
+      @Nullable List<Interval> skipIntervals
   )
   {
     Preconditions.checkArgument(timeline != null && !timeline.isEmpty(), "timeline should not be null or empty");
@@ -355,35 +361,118 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
 
     final TimelineObjectHolder<String, DataSegment> first = Preconditions.checkNotNull(timeline.first(), "first");
     final TimelineObjectHolder<String, DataSegment> last = Preconditions.checkNotNull(timeline.last(), "last");
+    final List<Interval> fullSkipIntervals = sortAndAddSkipIntervalFromLatest(
+        last.getInterval().getEnd(),
+        skipOffset,
+        skipIntervals
+    );
 
-    final Interval skipInterval = new Interval(skipOffset, last.getInterval().getEnd());
+    final Interval totalInterval = new Interval(first.getInterval().getStart(), last.getInterval().getEnd());
+    final List<Interval> filteredInterval = filterSkipIntervals(totalInterval, fullSkipIntervals);
+    final List<Interval> searchIntervals = new ArrayList<>();
 
-    final DateTime lookupStart = first.getInterval().getStart();
-    final DateTime lookupEnd = last.getInterval().getEnd().minus(skipOffset);
-    if (lookupStart.isBefore(lookupEnd)) {
+    for (Interval lookupInterval : filteredInterval) {
       final List<TimelineObjectHolder<String, DataSegment>> holders = timeline.lookup(
-          new Interval(lookupStart, lookupEnd)
+          new Interval(lookupInterval.getStart(), lookupInterval.getEnd())
       );
 
       final List<DataSegment> segments = holders
           .stream()
           .flatMap(holder -> StreamSupport.stream(holder.getObject().spliterator(), false))
           .map(PartitionChunk::getObject)
-          .filter(segment -> !segment.getInterval().overlaps(skipInterval))
+          .filter(segment -> lookupInterval.contains(segment.getInterval()))
           .sorted((s1, s2) -> Comparators.intervalsByStartThenEnd().compare(s1.getInterval(), s2.getInterval()))
           .collect(Collectors.toList());
 
-      if (segments.isEmpty()) {
-        return null;
-      } else {
-        return new Interval(
-            segments.get(0).getInterval().getStart(),
-            segments.get(segments.size() - 1).getInterval().getEnd()
+      if (!segments.isEmpty()) {
+        searchIntervals.add(
+            new Interval(
+                segments.get(0).getInterval().getStart(),
+                segments.get(segments.size() - 1).getInterval().getEnd()
+            )
         );
       }
+    }
+
+    return searchIntervals;
+  }
+
+  @VisibleForTesting
+  static List<Interval> sortAndAddSkipIntervalFromLatest(
+      DateTime latest,
+      Period skipOffset,
+      @Nullable List<Interval> skipIntervals
+  )
+  {
+    final List<Interval> nonNullSkipIntervals = skipIntervals == null
+                                                ? new ArrayList<>(1)
+                                                : new ArrayList<>(skipIntervals.size());
+
+    if (skipIntervals != null) {
+      final List<Interval> sortedSkipIntervals = new ArrayList<>(skipIntervals);
+      sortedSkipIntervals.sort(Comparators.intervalsByStartThenEnd());
+
+      final List<Interval> overlapIntervals = new ArrayList<>();
+      final Interval skipFromLatest = new Interval(skipOffset, latest);
+
+      for (Interval interval : sortedSkipIntervals) {
+        if (interval.overlaps(skipFromLatest)) {
+          overlapIntervals.add(interval);
+        } else {
+          nonNullSkipIntervals.add(interval);
+        }
+      }
+
+      if (!overlapIntervals.isEmpty()) {
+        overlapIntervals.add(skipFromLatest);
+        nonNullSkipIntervals.add(JodaUtils.umbrellaInterval(overlapIntervals));
+      } else {
+        nonNullSkipIntervals.add(skipFromLatest);
+      }
     } else {
-      return null;
+      final Interval skipFromLatest = new Interval(skipOffset, latest);
+      nonNullSkipIntervals.add(skipFromLatest);
+    }
+
+    return nonNullSkipIntervals;
+  }
+
+  /**
+   * Returns a list of intervals which are contained by totalInterval but don't ovarlap with skipIntervals.
+   *
+   * @param totalInterval total interval
+   * @param skipIntervals intervals to skip. This should be sorted by {@link Comparators#intervalsByStartThenEnd()}.
+   */
+  @VisibleForTesting
+  static List<Interval> filterSkipIntervals(Interval totalInterval, List<Interval> skipIntervals)
+  {
+    final List<Interval> filteredIntervals = new ArrayList<>(skipIntervals.size() + 1);
+
+    DateTime remainingStart = totalInterval.getStart();
+    DateTime remainingEnd = totalInterval.getEnd();
+    for (Interval skipInterval : skipIntervals) {
+      if (skipInterval.getStart().isBefore(remainingStart) && skipInterval.getEnd().isAfter(remainingStart)) {
+        remainingStart = skipInterval.getEnd();
+      } else if (skipInterval.getStart().isBefore(remainingEnd) && skipInterval.getEnd().isAfter(remainingEnd)) {
+        remainingEnd = skipInterval.getStart();
+      } else if (!remainingStart.isAfter(skipInterval.getStart()) && !remainingEnd.isBefore(skipInterval.getEnd())) {
+        filteredIntervals.add(new Interval(remainingStart, skipInterval.getStart()));
+        remainingStart = skipInterval.getEnd();
+      } else {
+        // Ignore this skipInterval
+        log.warn(
+            "skipInterval[%s] is not contained in remainingInterval[%s]",
+            skipInterval,
+            new Interval(remainingStart, remainingEnd)
+        );
+      }
+    }
+
+    if (!remainingStart.equals(remainingEnd)) {
+      filteredIntervals.add(new Interval(remainingStart, remainingEnd));
     }
+
+    return filteredIntervals;
   }
 
   private static class QueueEntry
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicy.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicy.java
index 19b882e..8a41182 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicy.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicy.java
@@ -22,7 +22,9 @@ package org.apache.druid.server.coordinator.helper;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
+import org.joda.time.Interval;
 
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -33,9 +35,10 @@ public class NewestSegmentFirstPolicy implements CompactionSegmentSearchPolicy
   @Override
   public CompactionSegmentIterator reset(
       Map<String, DataSourceCompactionConfig> compactionConfigs,
-      Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources
+      Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources,
+      Map<String, List<Interval>> skipIntervals
   )
   {
-    return new NewestSegmentFirstIterator(compactionConfigs, dataSources);
+    return new NewestSegmentFirstIterator(compactionConfigs, dataSources, skipIntervals);
   }
 }
diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
index d395a6c..01f84df 100644
--- a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
+++ b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
@@ -109,4 +109,10 @@ public class NoopIndexingServiceClient implements IndexingServiceClient
   {
     return null;
   }
+
+  @Override
+  public TaskPayloadResponse getTaskPayload(String taskId)
+  {
+    return null;
+  }
 }
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIteratorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIteratorTest.java
new file mode 100644
index 0000000..0e5f333
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIteratorTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.helper;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+public class NewestSegmentFirstIteratorTest
+{
+  @Test
+  public void testFilterSkipIntervals()
+  {
+    final Interval totalInterval = Intervals.of("2018-01-01/2019-01-01");
+    final List<Interval> expectedSkipIntervals = ImmutableList.of(
+        Intervals.of("2018-01-15/2018-03-02"),
+        Intervals.of("2018-07-23/2018-10-01"),
+        Intervals.of("2018-10-02/2018-12-25"),
+        Intervals.of("2018-12-31/2019-01-01")
+    );
+    final List<Interval> skipIntervals = NewestSegmentFirstIterator.filterSkipIntervals(
+        totalInterval,
+        Lists.newArrayList(
+            Intervals.of("2017-12-01/2018-01-15"),
+            Intervals.of("2018-03-02/2018-07-23"),
+            Intervals.of("2018-10-01/2018-10-02"),
+            Intervals.of("2018-12-25/2018-12-31")
+        )
+    );
+
+    Assert.assertEquals(expectedSkipIntervals, skipIntervals);
+  }
+
+  @Test
+  public void testAddSkipIntervalFromLatestAndSort()
+  {
+    final List<Interval> expectedIntervals = ImmutableList.of(
+        Intervals.of("2018-12-24/2018-12-25"),
+        Intervals.of("2018-12-29/2019-01-01")
+    );
+    final List<Interval> fullSkipIntervals = NewestSegmentFirstIterator.sortAndAddSkipIntervalFromLatest(
+        DateTimes.of("2019-01-01"),
+        new Period(72, 0, 0, 0),
+        ImmutableList.of(
+            Intervals.of("2018-12-30/2018-12-31"),
+            Intervals.of("2018-12-24/2018-12-25")
+        )
+    );
+
+    Assert.assertEquals(expectedIntervals, fullSkipIntervals);
+  }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java
index 5ddbaab..8fe3e37 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java
@@ -78,7 +78,8 @@ public class NewestSegmentFirstPolicyTest
                 new SegmentGenerateSpec(Intervals.of("2017-11-16T20:00:00/2017-11-17T04:00:00"), segmentPeriod),
                 new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-16T07:00:00"), segmentPeriod)
             )
-        )
+        ),
+        Collections.emptyMap()
     );
 
     assertCompactSegmentIntervals(
@@ -102,7 +103,8 @@ public class NewestSegmentFirstPolicyTest
                 new SegmentGenerateSpec(Intervals.of("2017-11-16T20:00:00/2017-11-17T04:00:00"), segmentPeriod),
                 new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-16T07:00:00"), segmentPeriod)
             )
-        )
+        ),
+        Collections.emptyMap()
     );
 
     assertCompactSegmentIntervals(
@@ -178,7 +180,8 @@ public class NewestSegmentFirstPolicyTest
                 // larger gap than SegmentCompactorUtil.LOOKUP_PERIOD (1 day)
                 new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-15T07:00:00"), segmentPeriod)
             )
-        )
+        ),
+        Collections.emptyMap()
     );
 
     assertCompactSegmentIntervals(
@@ -211,7 +214,8 @@ public class NewestSegmentFirstPolicyTest
                 // larger gap than SegmentCompactorUtil.LOOKUP_PERIOD (1 day)
                 new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-15T07:00:00"), segmentPeriod)
             )
-        )
+        ),
+        Collections.emptyMap()
     );
 
     assertCompactSegmentIntervals(
@@ -258,7 +262,8 @@ public class NewestSegmentFirstPolicyTest
                     DEFAULT_NUM_SEGMENTS_PER_SHARD
                 )
             )
-        )
+        ),
+        Collections.emptyMap()
     );
 
     Interval lastInterval = null;
@@ -313,7 +318,8 @@ public class NewestSegmentFirstPolicyTest
                     80
                 )
             )
-        )
+        ),
+        Collections.emptyMap()
     );
 
     Interval lastInterval = null;
@@ -392,7 +398,8 @@ public class NewestSegmentFirstPolicyTest
                     150
                 )
             )
-        )
+        ),
+        Collections.emptyMap()
     );
 
     Assert.assertFalse(iterator.hasNext());
@@ -416,7 +423,8 @@ public class NewestSegmentFirstPolicyTest
                 new SegmentGenerateSpec(Intervals.of("2017-11-16T20:00:00/2017-11-17T04:00:00"), segmentPeriod),
                 new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-16T07:00:00"), segmentPeriod)
             )
-        )
+        ),
+        Collections.emptyMap()
     );
 
     assertCompactSegmentIntervals(
@@ -449,7 +457,8 @@ public class NewestSegmentFirstPolicyTest
                     1
                 )
             )
-        )
+        ),
+        Collections.emptyMap()
     );
 
     Assert.assertFalse(iterator.hasNext());
@@ -481,7 +490,8 @@ public class NewestSegmentFirstPolicyTest
     );
     final CompactionSegmentIterator iterator = policy.reset(
         ImmutableMap.of(DATA_SOURCE, createCompactionConfig(maxSizeOfSegmentsToCompact, 100, new Period("P0D"))),
-        ImmutableMap.of(DATA_SOURCE, timeline)
+        ImmutableMap.of(DATA_SOURCE, timeline),
+        Collections.emptyMap()
     );
 
     final List<DataSegment> expectedSegmentsToCompact = timeline
@@ -510,7 +520,8 @@ public class NewestSegmentFirstPolicyTest
 
     final CompactionSegmentIterator iterator = policy.reset(
         ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, 100, new Period("P1D"))),
-        ImmutableMap.of(DATA_SOURCE, timeline)
+        ImmutableMap.of(DATA_SOURCE, timeline),
+        Collections.emptyMap()
     );
 
     Assert.assertFalse(iterator.hasNext());
@@ -530,12 +541,53 @@ public class NewestSegmentFirstPolicyTest
 
     final CompactionSegmentIterator iterator = policy.reset(
         ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, 100, new Period("P1D"))),
-        ImmutableMap.of(DATA_SOURCE, timeline)
+        ImmutableMap.of(DATA_SOURCE, timeline),
+        Collections.emptyMap()
     );
 
     Assert.assertFalse(iterator.hasNext());
   }
 
+  @Test
+  public void testWithSkipIntervals()
+  {
+    final Period segmentPeriod = new Period("PT1H");
+    final CompactionSegmentIterator iterator = policy.reset(
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, 100, new Period("P1D"))),
+        ImmutableMap.of(
+            DATA_SOURCE,
+            createTimeline(
+                new SegmentGenerateSpec(Intervals.of("2017-11-16T20:00:00/2017-11-17T04:00:00"), segmentPeriod),
+                new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-16T07:00:00"), segmentPeriod)
+            )
+        ),
+        ImmutableMap.of(
+            DATA_SOURCE,
+            ImmutableList.of(
+                Intervals.of("2017-11-16T00:00:00/2017-11-17T00:00:00"),
+                Intervals.of("2017-11-15T00:00:00/2017-11-15T20:00:00"),
+                Intervals.of("2017-11-13T00:00:00/2017-11-14T01:00:00")
+            )
+        )
+    );
+
+    assertCompactSegmentIntervals(
+        iterator,
+        segmentPeriod,
+        Intervals.of("2017-11-15T20:00:00/2017-11-15T21:00:00"),
+        Intervals.of("2017-11-15T23:00:00/2017-11-16T00:00:00"),
+        false
+    );
+
+    assertCompactSegmentIntervals(
+        iterator,
+        segmentPeriod,
+        Intervals.of("2017-11-14T01:00:00/2017-11-14T02:00:00"),
+        Intervals.of("2017-11-14T23:00:00/2017-11-15T00:00:00"),
+        true
+    );
+  }
+
   private static void assertCompactSegmentIntervals(
       CompactionSegmentIterator iterator,
       Period segmentPeriod,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org