You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/12/28 02:03:46 UTC

[GitHub] fjy closed pull request #6767: Fix auto compaction to consider intervals of running tasks

fjy closed pull request #6767: Fix auto compaction to consider intervals of running tasks
URL: https://github.com/apache/incubator-druid/pull/6767
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java b/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
index 760c0a523a3..658a3314904 100644
--- a/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
+++ b/benchmarks/src/main/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
@@ -42,6 +42,7 @@
 import org.openjdk.jmh.annotations.State;
 import org.openjdk.jmh.infra.Blackhole;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -137,7 +138,7 @@ public void setup()
   @Benchmark
   public void measureNewestSegmentFirstPolicy(Blackhole blackhole)
   {
-    final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources);
+    final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources, Collections.emptyMap());
     for (int i = 0; i < numCompactionTaskSlots && iterator.hasNext(); i++) {
       final List<DataSegment> segments = iterator.next();
       blackhole.consume(segments);
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientAppendQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientAppendQuery.java
index 4271200e7f6..c5497b8cb9d 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientAppendQuery.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientAppendQuery.java
@@ -27,7 +27,7 @@
 
 /**
  */
-public class ClientAppendQuery
+public class ClientAppendQuery implements ClientQuery
 {
   private final String dataSource;
   private final List<DataSegment> segments;
@@ -43,12 +43,14 @@ public ClientAppendQuery(
   }
 
   @JsonProperty
+  @Override
   public String getType()
   {
     return "append";
   }
 
   @JsonProperty
+  @Override
   public String getDataSource()
   {
     return dataSource;
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java
index 1fe2b0417ea..eec9004c752 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactQuery.java
@@ -26,7 +26,7 @@
 import java.util.List;
 import java.util.Map;
 
-public class ClientCompactQuery
+public class ClientCompactQuery implements ClientQuery
 {
   private final String dataSource;
   private final List<DataSegment> segments;
@@ -54,12 +54,14 @@ public ClientCompactQuery(
   }
 
   @JsonProperty
+  @Override
   public String getType()
   {
     return "compact";
   }
 
   @JsonProperty
+  @Override
   public String getDataSource()
   {
     return dataSource;
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientKillQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientKillQuery.java
index 4d972463295..06d88f9535a 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientKillQuery.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientKillQuery.java
@@ -25,7 +25,7 @@
 
 /**
  */
-public class ClientKillQuery
+public class ClientKillQuery implements ClientQuery
 {
   private final String dataSource;
   private final Interval interval;
@@ -41,12 +41,14 @@ public ClientKillQuery(
   }
 
   @JsonProperty
+  @Override
   public String getType()
   {
     return "kill";
   }
 
   @JsonProperty
+  @Override
   public String getDataSource()
   {
     return dataSource;
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientMergeQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientMergeQuery.java
index 3800953950f..b61c6e6a11e 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientMergeQuery.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientMergeQuery.java
@@ -28,7 +28,7 @@
 
 /**
  */
-public class ClientMergeQuery
+public class ClientMergeQuery implements ClientQuery
 {
   private final String dataSource;
   private final List<DataSegment> segments;
@@ -48,12 +48,14 @@ public ClientMergeQuery(
   }
 
   @JsonProperty
+  @Override
   public String getType()
   {
     return "merge";
   }
 
   @JsonProperty
+  @Override
   public String getDataSource()
   {
     return dataSource;
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientQuery.java
new file mode 100644
index 00000000000..6dbd631baf5
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientQuery.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client.indexing;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+/**
+ * org.apache.druid.indexing.common.task.Task representation for clients
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes(value = {
+    @Type(name = "append", value = ClientAppendQuery.class),
+    @Type(name = "merge", value = ClientMergeQuery.class),
+    @Type(name = "kill", value = ClientKillQuery.class),
+    @Type(name = "compact", value = ClientCompactQuery.class)
+})
+public interface ClientQuery
+{
+  String getType();
+
+  String getDataSource();
+}
diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
index 09a4753ea3c..b8960c03c58 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
@@ -278,6 +278,26 @@ public TaskStatusPlus getLastCompleteTask()
     return completeTaskStatuses.isEmpty() ? null : completeTaskStatuses.get(0);
   }
 
+  @Override
+  public TaskPayloadResponse getTaskPayload(String taskId)
+  {
+    try {
+      final FullResponseHolder responseHolder = druidLeaderClient.go(
+          druidLeaderClient.makeRequest(HttpMethod.GET, StringUtils.format("/druid/indexer/v1/task/%s", taskId))
+      );
+
+      return jsonMapper.readValue(
+          responseHolder.getContent(),
+          new TypeReference<TaskPayloadResponse>()
+          {
+          }
+      );
+    }
+    catch (IOException | InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   @Override
   public int killPendingSegments(String dataSource, DateTime end)
   {
diff --git a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
index d9d35f5e624..8a54bb4c5eb 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
@@ -61,4 +61,7 @@ String compactSegments(
 
   @Nullable
   TaskStatusPlus getLastCompleteTask();
+
+  @Nullable
+  TaskPayloadResponse getTaskPayload(String taskId);
 }
diff --git a/server/src/main/java/org/apache/druid/client/indexing/TaskPayloadResponse.java b/server/src/main/java/org/apache/druid/client/indexing/TaskPayloadResponse.java
new file mode 100644
index 00000000000..1b938af3262
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/client/indexing/TaskPayloadResponse.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client.indexing;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class TaskPayloadResponse
+{
+  private final String task;
+  private final ClientQuery payload;
+
+  @JsonCreator
+  public TaskPayloadResponse(
+      @JsonProperty("task") final String task,
+      @JsonProperty("payload") final ClientQuery payload
+  )
+  {
+    this.task = task;
+    this.payload = payload;
+  }
+
+  @JsonProperty
+  public String getTask()
+  {
+    return task;
+  }
+
+  @JsonProperty
+  public ClientQuery getPayload()
+  {
+    return payload;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "TaskPayloadResponse{" +
+           "task='" + task + '\'' +
+           ", payload=" + payload +
+           '}';
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/CompactionSegmentSearchPolicy.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/CompactionSegmentSearchPolicy.java
index db861630af8..fddec4b299f 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/CompactionSegmentSearchPolicy.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/CompactionSegmentSearchPolicy.java
@@ -22,7 +22,9 @@
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
+import org.joda.time.Interval;
 
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -35,6 +37,7 @@
    */
   CompactionSegmentIterator reset(
       Map<String, DataSourceCompactionConfig> compactionConfigs,
-      Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources
+      Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources,
+      Map<String, List<Interval>> skipIntervals
   );
 }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
index 82be5c4458d..ec189c0c2cd 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
@@ -21,9 +21,13 @@
 
 import com.google.inject.Inject;
 import it.unimi.dsi.fastutil.objects.Object2LongMap;
+import org.apache.druid.client.indexing.ClientCompactQuery;
 import org.apache.druid.client.indexing.IndexingServiceClient;
+import org.apache.druid.client.indexing.TaskPayloadResponse;
 import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.JodaUtils;
+import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
 import org.apache.druid.server.coordinator.CoordinatorStats;
@@ -32,10 +36,12 @@
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.DataSegmentUtils;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
+import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
@@ -77,23 +83,46 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
         Map<String, DataSourceCompactionConfig> compactionConfigs = compactionConfigList
             .stream()
             .collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity()));
-        final int numNonCompleteCompactionTasks = findNumNonCompleteCompactTasks(
+        final List<TaskStatusPlus> compactTasks = filterNonCompactTasks(
             indexingServiceClient.getRunningTasks(),
             indexingServiceClient.getPendingTasks(),
             indexingServiceClient.getWaitingTasks()
         );
-        final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources);
+        // dataSource -> list of intervals of compact tasks
+        final Map<String, List<Interval>> compactTaskIntervals = new HashMap<>(compactionConfigList.size());
+        for (TaskStatusPlus status : compactTasks) {
+          final TaskPayloadResponse response = indexingServiceClient.getTaskPayload(status.getId());
+          if (response == null) {
+            throw new ISE("WTH? got a null paylord from overlord for task[%s]", status.getId());
+          }
+          if (COMPACT_TASK_TYPE.equals(response.getPayload().getType())) {
+            final ClientCompactQuery compactQuery = (ClientCompactQuery) response.getPayload();
+            final Interval interval = JodaUtils.umbrellaInterval(
+                compactQuery.getSegments()
+                            .stream()
+                            .map(DataSegment::getInterval)
+                            .sorted(Comparators.intervalsByStartThenEnd())
+                            .collect(Collectors.toList())
+            );
+            compactTaskIntervals.computeIfAbsent(status.getDataSource(), k -> new ArrayList<>()).add(interval);
+          } else {
+            throw new ISE("WTH? task[%s] is not a compactTask?", status.getId());
+          }
+        }
+
+        final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources, compactTaskIntervals);
 
         final int compactionTaskCapacity = (int) Math.min(
             indexingServiceClient.getTotalWorkerCapacity() * dynamicConfig.getCompactionTaskSlotRatio(),
             dynamicConfig.getMaxCompactionTaskSlots()
         );
-        final int numAvailableCompactionTaskSlots = numNonCompleteCompactionTasks > 0 ?
-                                                    compactionTaskCapacity - numNonCompleteCompactionTasks :
+        final int numNonCompleteCompactionTasks = compactTasks.size();
+        final int numAvailableCompactionTaskSlots = numNonCompleteCompactionTasks > 0
+                                                    ? Math.max(0, compactionTaskCapacity - numNonCompleteCompactionTasks)
                                                     // compactionTaskCapacity might be 0 if totalWorkerCapacity is low.
                                                     // This guarantees that at least one slot is available if
                                                     // compaction is enabled and numRunningCompactTasks is 0.
-                                                    Math.max(1, compactionTaskCapacity);
+                                                    : Math.max(1, compactionTaskCapacity);
         LOG.info(
             "Found [%d] available task slots for compaction out of [%d] max compaction task capacity",
             numAvailableCompactionTaskSlots,
@@ -117,7 +146,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
   }
 
   @SafeVarargs
-  private static int findNumNonCompleteCompactTasks(List<TaskStatusPlus>...taskStatusStreams)
+  private static List<TaskStatusPlus> filterNonCompactTasks(List<TaskStatusPlus>...taskStatusStreams)
   {
     final List<TaskStatusPlus> allTaskStatusPlus = new ArrayList<>();
     Arrays.stream(taskStatusStreams).forEach(allTaskStatusPlus::addAll);
@@ -132,8 +161,7 @@ private static int findNumNonCompleteCompactTasks(List<TaskStatusPlus>...taskSta
           // performance.
           return taskType == null || COMPACT_TASK_TYPE.equals(taskType);
         })
-        .collect(Collectors.toList())
-        .size();
+        .collect(Collectors.toList());
   }
 
   private CoordinatorStats doRun(
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
index d114fff0d7e..8c0d80d175f 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIterator.java
@@ -19,10 +19,12 @@
 
 package org.apache.druid.server.coordinator.helper;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.JodaUtils;
 import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@@ -67,7 +69,8 @@
 
   NewestSegmentFirstIterator(
       Map<String, DataSourceCompactionConfig> compactionConfigs,
-      Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources
+      Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources,
+      Map<String, List<Interval>> skipIntervals
   )
   {
     this.compactionConfigs = compactionConfigs;
@@ -80,9 +83,9 @@
       final DataSourceCompactionConfig config = compactionConfigs.get(dataSource);
 
       if (config != null && !timeline.isEmpty()) {
-        final Interval searchInterval = findInitialSearchInterval(timeline, config.getSkipOffsetFromLatest());
-        if (searchInterval != null) {
-          timelineIterators.put(dataSource, new CompactibleTimelineObjectHolderCursor(timeline, searchInterval));
+        final List<Interval> searchIntervals = findInitialSearchInterval(timeline, config.getSkipOffsetFromLatest(), skipIntervals.get(dataSource));
+        if (!searchIntervals.isEmpty()) {
+          timelineIterators.put(dataSource, new CompactibleTimelineObjectHolderCursor(timeline, searchIntervals));
         }
       }
     }
@@ -186,19 +189,22 @@ private void updateQueue(String dataSourceName, DataSourceCompactionConfig confi
 
     CompactibleTimelineObjectHolderCursor(
         VersionedIntervalTimeline<String, DataSegment> timeline,
-        Interval totalIntervalToSearch
+        List<Interval> totalIntervalsToSearch
     )
     {
-      this.holders = timeline
-          .lookup(totalIntervalToSearch)
+      this.holders = totalIntervalsToSearch
           .stream()
-          .filter(holder -> {
-            final List<PartitionChunk<DataSegment>> chunks = Lists.newArrayList(holder.getObject().iterator());
-            final long partitionBytes = chunks.stream().mapToLong(chunk -> chunk.getObject().getSize()).sum();
-            return chunks.size() > 0
-                   && partitionBytes > 0
-                   && totalIntervalToSearch.contains(chunks.get(0).getObject().getInterval());
-          })
+          .flatMap(interval -> timeline
+              .lookup(interval)
+              .stream()
+              .filter(holder -> {
+                final List<PartitionChunk<DataSegment>> chunks = Lists.newArrayList(holder.getObject().iterator());
+                final long partitionBytes = chunks.stream().mapToLong(chunk -> chunk.getObject().getSize()).sum();
+                return chunks.size() > 0
+                       && partitionBytes > 0
+                       && interval.contains(chunks.get(0).getObject().getInterval());
+              })
+          )
           .collect(Collectors.toList());
     }
 
@@ -339,15 +345,15 @@ private static SegmentsToCompact findSegmentsToCompact(
   /**
    * Returns the initial searchInterval which is {@code (timeline.first().start, timeline.last().end - skipOffset)}.
    *
-   * @param timeline   timeline of a dataSource
-   * @param skipOffset skipOFfset
+   * @param timeline      timeline of a dataSource
+   * @param skipIntervals intervals to skip
    *
    * @return found interval to search or null if it's not found
    */
-  @Nullable
-  private static Interval findInitialSearchInterval(
+  private static List<Interval> findInitialSearchInterval(
       VersionedIntervalTimeline<String, DataSegment> timeline,
-      Period skipOffset
+      Period skipOffset,
+      @Nullable List<Interval> skipIntervals
   )
   {
     Preconditions.checkArgument(timeline != null && !timeline.isEmpty(), "timeline should not be null or empty");
@@ -355,35 +361,118 @@ private static Interval findInitialSearchInterval(
 
     final TimelineObjectHolder<String, DataSegment> first = Preconditions.checkNotNull(timeline.first(), "first");
     final TimelineObjectHolder<String, DataSegment> last = Preconditions.checkNotNull(timeline.last(), "last");
+    final List<Interval> fullSkipIntervals = sortAndAddSkipIntervalFromLatest(
+        last.getInterval().getEnd(),
+        skipOffset,
+        skipIntervals
+    );
 
-    final Interval skipInterval = new Interval(skipOffset, last.getInterval().getEnd());
+    final Interval totalInterval = new Interval(first.getInterval().getStart(), last.getInterval().getEnd());
+    final List<Interval> filteredInterval = filterSkipIntervals(totalInterval, fullSkipIntervals);
+    final List<Interval> searchIntervals = new ArrayList<>();
 
-    final DateTime lookupStart = first.getInterval().getStart();
-    final DateTime lookupEnd = last.getInterval().getEnd().minus(skipOffset);
-    if (lookupStart.isBefore(lookupEnd)) {
+    for (Interval lookupInterval : filteredInterval) {
       final List<TimelineObjectHolder<String, DataSegment>> holders = timeline.lookup(
-          new Interval(lookupStart, lookupEnd)
+          new Interval(lookupInterval.getStart(), lookupInterval.getEnd())
       );
 
       final List<DataSegment> segments = holders
           .stream()
           .flatMap(holder -> StreamSupport.stream(holder.getObject().spliterator(), false))
           .map(PartitionChunk::getObject)
-          .filter(segment -> !segment.getInterval().overlaps(skipInterval))
+          .filter(segment -> lookupInterval.contains(segment.getInterval()))
           .sorted((s1, s2) -> Comparators.intervalsByStartThenEnd().compare(s1.getInterval(), s2.getInterval()))
           .collect(Collectors.toList());
 
-      if (segments.isEmpty()) {
-        return null;
-      } else {
-        return new Interval(
-            segments.get(0).getInterval().getStart(),
-            segments.get(segments.size() - 1).getInterval().getEnd()
+      if (!segments.isEmpty()) {
+        searchIntervals.add(
+            new Interval(
+                segments.get(0).getInterval().getStart(),
+                segments.get(segments.size() - 1).getInterval().getEnd()
+            )
         );
       }
+    }
+
+    return searchIntervals;
+  }
+
+  @VisibleForTesting
+  static List<Interval> sortAndAddSkipIntervalFromLatest(
+      DateTime latest,
+      Period skipOffset,
+      @Nullable List<Interval> skipIntervals
+  )
+  {
+    final List<Interval> nonNullSkipIntervals = skipIntervals == null
+                                                ? new ArrayList<>(1)
+                                                : new ArrayList<>(skipIntervals.size());
+
+    if (skipIntervals != null) {
+      final List<Interval> sortedSkipIntervals = new ArrayList<>(skipIntervals);
+      sortedSkipIntervals.sort(Comparators.intervalsByStartThenEnd());
+
+      final List<Interval> overlapIntervals = new ArrayList<>();
+      final Interval skipFromLatest = new Interval(skipOffset, latest);
+
+      for (Interval interval : sortedSkipIntervals) {
+        if (interval.overlaps(skipFromLatest)) {
+          overlapIntervals.add(interval);
+        } else {
+          nonNullSkipIntervals.add(interval);
+        }
+      }
+
+      if (!overlapIntervals.isEmpty()) {
+        overlapIntervals.add(skipFromLatest);
+        nonNullSkipIntervals.add(JodaUtils.umbrellaInterval(overlapIntervals));
+      } else {
+        nonNullSkipIntervals.add(skipFromLatest);
+      }
     } else {
-      return null;
+      final Interval skipFromLatest = new Interval(skipOffset, latest);
+      nonNullSkipIntervals.add(skipFromLatest);
+    }
+
+    return nonNullSkipIntervals;
+  }
+
+  /**
+   * Returns a list of intervals which are contained by totalInterval but don't ovarlap with skipIntervals.
+   *
+   * @param totalInterval total interval
+   * @param skipIntervals intervals to skip. This should be sorted by {@link Comparators#intervalsByStartThenEnd()}.
+   */
+  @VisibleForTesting
+  static List<Interval> filterSkipIntervals(Interval totalInterval, List<Interval> skipIntervals)
+  {
+    final List<Interval> filteredIntervals = new ArrayList<>(skipIntervals.size() + 1);
+
+    DateTime remainingStart = totalInterval.getStart();
+    DateTime remainingEnd = totalInterval.getEnd();
+    for (Interval skipInterval : skipIntervals) {
+      if (skipInterval.getStart().isBefore(remainingStart) && skipInterval.getEnd().isAfter(remainingStart)) {
+        remainingStart = skipInterval.getEnd();
+      } else if (skipInterval.getStart().isBefore(remainingEnd) && skipInterval.getEnd().isAfter(remainingEnd)) {
+        remainingEnd = skipInterval.getStart();
+      } else if (!remainingStart.isAfter(skipInterval.getStart()) && !remainingEnd.isBefore(skipInterval.getEnd())) {
+        filteredIntervals.add(new Interval(remainingStart, skipInterval.getStart()));
+        remainingStart = skipInterval.getEnd();
+      } else {
+        // Ignore this skipInterval
+        log.warn(
+            "skipInterval[%s] is not contained in remainingInterval[%s]",
+            skipInterval,
+            new Interval(remainingStart, remainingEnd)
+        );
+      }
+    }
+
+    if (!remainingStart.equals(remainingEnd)) {
+      filteredIntervals.add(new Interval(remainingStart, remainingEnd));
     }
+
+    return filteredIntervals;
   }
 
   private static class QueueEntry
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicy.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicy.java
index 19b882e0097..8a4118221d0 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicy.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicy.java
@@ -22,7 +22,9 @@
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
+import org.joda.time.Interval;
 
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -33,9 +35,10 @@
   @Override
   public CompactionSegmentIterator reset(
       Map<String, DataSourceCompactionConfig> compactionConfigs,
-      Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources
+      Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources,
+      Map<String, List<Interval>> skipIntervals
   )
   {
-    return new NewestSegmentFirstIterator(compactionConfigs, dataSources);
+    return new NewestSegmentFirstIterator(compactionConfigs, dataSources, skipIntervals);
   }
 }
diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
index d395a6c2efa..01f84df01f3 100644
--- a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
+++ b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
@@ -109,4 +109,10 @@ public TaskStatusPlus getLastCompleteTask()
   {
     return null;
   }
+
+  @Override
+  public TaskPayloadResponse getTaskPayload(String taskId)
+  {
+    return null;
+  }
 }
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIteratorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIteratorTest.java
new file mode 100644
index 00000000000..0e5f3339709
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstIteratorTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.helper;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+public class NewestSegmentFirstIteratorTest
+{
+  @Test
+  public void testFilterSkipIntervals()
+  {
+    final Interval totalInterval = Intervals.of("2018-01-01/2019-01-01");
+    final List<Interval> expectedSkipIntervals = ImmutableList.of(
+        Intervals.of("2018-01-15/2018-03-02"),
+        Intervals.of("2018-07-23/2018-10-01"),
+        Intervals.of("2018-10-02/2018-12-25"),
+        Intervals.of("2018-12-31/2019-01-01")
+    );
+    final List<Interval> skipIntervals = NewestSegmentFirstIterator.filterSkipIntervals(
+        totalInterval,
+        Lists.newArrayList(
+            Intervals.of("2017-12-01/2018-01-15"),
+            Intervals.of("2018-03-02/2018-07-23"),
+            Intervals.of("2018-10-01/2018-10-02"),
+            Intervals.of("2018-12-25/2018-12-31")
+        )
+    );
+
+    Assert.assertEquals(expectedSkipIntervals, skipIntervals);
+  }
+
+  @Test
+  public void testAddSkipIntervalFromLatestAndSort()
+  {
+    final List<Interval> expectedIntervals = ImmutableList.of(
+        Intervals.of("2018-12-24/2018-12-25"),
+        Intervals.of("2018-12-29/2019-01-01")
+    );
+    final List<Interval> fullSkipIntervals = NewestSegmentFirstIterator.sortAndAddSkipIntervalFromLatest(
+        DateTimes.of("2019-01-01"),
+        new Period(72, 0, 0, 0),
+        ImmutableList.of(
+            Intervals.of("2018-12-30/2018-12-31"),
+            Intervals.of("2018-12-24/2018-12-25")
+        )
+    );
+
+    Assert.assertEquals(expectedIntervals, fullSkipIntervals);
+  }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java
index 5ddbaab6cd9..8fe3e37cfa4 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/NewestSegmentFirstPolicyTest.java
@@ -78,7 +78,8 @@ public void testLargeOffsetAndSmallSegmentInterval()
                 new SegmentGenerateSpec(Intervals.of("2017-11-16T20:00:00/2017-11-17T04:00:00"), segmentPeriod),
                 new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-16T07:00:00"), segmentPeriod)
             )
-        )
+        ),
+        Collections.emptyMap()
     );
 
     assertCompactSegmentIntervals(
@@ -102,7 +103,8 @@ public void testSmallOffsetAndLargeSegmentInterval()
                 new SegmentGenerateSpec(Intervals.of("2017-11-16T20:00:00/2017-11-17T04:00:00"), segmentPeriod),
                 new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-16T07:00:00"), segmentPeriod)
             )
-        )
+        ),
+        Collections.emptyMap()
     );
 
     assertCompactSegmentIntervals(
@@ -178,7 +180,8 @@ public void testLargeGapInData()
                 // larger gap than SegmentCompactorUtil.LOOKUP_PERIOD (1 day)
                 new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-15T07:00:00"), segmentPeriod)
             )
-        )
+        ),
+        Collections.emptyMap()
     );
 
     assertCompactSegmentIntervals(
@@ -211,7 +214,8 @@ public void testSmallNumTargetCompactionSegments()
                 // larger gap than SegmentCompactorUtil.LOOKUP_PERIOD (1 day)
                 new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-15T07:00:00"), segmentPeriod)
             )
-        )
+        ),
+        Collections.emptyMap()
     );
 
     assertCompactSegmentIntervals(
@@ -258,7 +262,8 @@ public void testHugeShard()
                     DEFAULT_NUM_SEGMENTS_PER_SHARD
                 )
             )
-        )
+        ),
+        Collections.emptyMap()
     );
 
     Interval lastInterval = null;
@@ -313,7 +318,8 @@ public void testManySegmentsPerShard()
                     80
                 )
             )
-        )
+        ),
+        Collections.emptyMap()
     );
 
     Interval lastInterval = null;
@@ -392,7 +398,8 @@ public void testManySegmentsPerShard2()
                     150
                 )
             )
-        )
+        ),
+        Collections.emptyMap()
     );
 
     Assert.assertFalse(iterator.hasNext());
@@ -416,7 +423,8 @@ public void testSkipUnknownDataSource()
                 new SegmentGenerateSpec(Intervals.of("2017-11-16T20:00:00/2017-11-17T04:00:00"), segmentPeriod),
                 new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-16T07:00:00"), segmentPeriod)
             )
-        )
+        ),
+        Collections.emptyMap()
     );
 
     assertCompactSegmentIntervals(
@@ -449,7 +457,8 @@ public void testIgnoreSingleSegmentToCompact()
                     1
                 )
             )
-        )
+        ),
+        Collections.emptyMap()
     );
 
     Assert.assertFalse(iterator.hasNext());
@@ -481,7 +490,8 @@ public void testClearSegmentsToCompactWhenSkippingSegments()
     );
     final CompactionSegmentIterator iterator = policy.reset(
         ImmutableMap.of(DATA_SOURCE, createCompactionConfig(maxSizeOfSegmentsToCompact, 100, new Period("P0D"))),
-        ImmutableMap.of(DATA_SOURCE, timeline)
+        ImmutableMap.of(DATA_SOURCE, timeline),
+        Collections.emptyMap()
     );
 
     final List<DataSegment> expectedSegmentsToCompact = timeline
@@ -510,7 +520,8 @@ public void testIfFirstSegmentIsInSkipOffset()
 
     final CompactionSegmentIterator iterator = policy.reset(
         ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, 100, new Period("P1D"))),
-        ImmutableMap.of(DATA_SOURCE, timeline)
+        ImmutableMap.of(DATA_SOURCE, timeline),
+        Collections.emptyMap()
     );
 
     Assert.assertFalse(iterator.hasNext());
@@ -530,12 +541,53 @@ public void testIfFirstSegmentOverlapsSkipOffset()
 
     final CompactionSegmentIterator iterator = policy.reset(
         ImmutableMap.of(DATA_SOURCE, createCompactionConfig(40000, 100, new Period("P1D"))),
-        ImmutableMap.of(DATA_SOURCE, timeline)
+        ImmutableMap.of(DATA_SOURCE, timeline),
+        Collections.emptyMap()
     );
 
     Assert.assertFalse(iterator.hasNext());
   }
 
+  @Test
+  public void testWithSkipIntervals()
+  {
+    final Period segmentPeriod = new Period("PT1H");
+    final CompactionSegmentIterator iterator = policy.reset(
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(10000, 100, new Period("P1D"))),
+        ImmutableMap.of(
+            DATA_SOURCE,
+            createTimeline(
+                new SegmentGenerateSpec(Intervals.of("2017-11-16T20:00:00/2017-11-17T04:00:00"), segmentPeriod),
+                new SegmentGenerateSpec(Intervals.of("2017-11-14T00:00:00/2017-11-16T07:00:00"), segmentPeriod)
+            )
+        ),
+        ImmutableMap.of(
+            DATA_SOURCE,
+            ImmutableList.of(
+                Intervals.of("2017-11-16T00:00:00/2017-11-17T00:00:00"),
+                Intervals.of("2017-11-15T00:00:00/2017-11-15T20:00:00"),
+                Intervals.of("2017-11-13T00:00:00/2017-11-14T01:00:00")
+            )
+        )
+    );
+
+    assertCompactSegmentIntervals(
+        iterator,
+        segmentPeriod,
+        Intervals.of("2017-11-15T20:00:00/2017-11-15T21:00:00"),
+        Intervals.of("2017-11-15T23:00:00/2017-11-16T00:00:00"),
+        false
+    );
+
+    assertCompactSegmentIntervals(
+        iterator,
+        segmentPeriod,
+        Intervals.of("2017-11-14T01:00:00/2017-11-14T02:00:00"),
+        Intervals.of("2017-11-14T23:00:00/2017-11-15T00:00:00"),
+        true
+    );
+  }
+
   private static void assertCompactSegmentIntervals(
       CompactionSegmentIterator iterator,
       Period segmentPeriod,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org