You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ag...@apache.org on 2022/04/15 16:08:14 UTC

[druid] branch master updated: Make tombstones ingestible by having them return an empty result set. (#12392)

This is an automated email from the ASF dual-hosted git repository.

agonzalez pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 0460d45e92 Make tombstones ingestible by having them return an empty result set. (#12392)
0460d45e92 is described below

commit 0460d45e92a15ebdadb4455afa16a7d977f74388
Author: Agustin Gonzalez <ag...@imply.io>
AuthorDate: Fri Apr 15 09:08:06 2022 -0700

    Make tombstones ingestible by having them return an empty result set. (#12392)
    
    * Make tombstones ingestible by having them return an empty result set.
    
    * Spotbug
    
    * Coverage
    
    * Coverage
    
    * Remove unnecessary exception (checkstyle)
    
    * Fix integration test and add one more to test dropExisting set to false over tombstones
    
    * Force dropExisting to true in auto-compaction when the interval contains only tombstones
    
    * Checkstyle, fix unit test
    
    * Changed flag by mistake, fixing it
    
    * Remove method from interface since this method is specific to only DruidSegmentInputentity
    
    * Fix typo
    
    * Adapt to latest code
    
    * Update comments when only tombstones to compact
    
    * Move empty iterator to a new DruidTombstoneSegmentReader
    
    * Code review feedback
    
    * Checkstyle
    
    * Review feedback
    
    * Coverage
---
 .../org/apache/druid/common/guava/GuavaUtils.java  |   5 +-
 .../org/apache/druid/data/input/InputEntity.java   |   1 +
 .../druid/indexing/common/task/CompactionTask.java | 165 ++++++++++--------
 .../druid/indexing/input/DruidInputSource.java     |   1 -
 .../indexing/input/DruidSegmentInputEntity.java    |   6 +
 .../indexing/input/DruidSegmentInputFormat.java    |  31 +++-
 .../druid/indexing/input/DruidSegmentReader.java   |   6 +-
 .../input/DruidTombstoneSegmentReader.java         |  88 ++++++++++
 .../common/task/CompactionTaskRunTest.java         | 116 +++++++++++++
 .../parallel/DimensionCardinalityReportTest.java   | 114 ++++++++++++
 .../input/DruidSegmentInputFormatTest.java         |  88 ++++++++++
 .../indexing/input/DruidSegmentReaderTest.java     | 114 +++++++++++-
 .../coordinator/duty/ITAutoCompactionTest.java     | 193 +++++++++++++++++++--
 .../server/coordinator/duty/CompactSegments.java   |  20 +++
 14 files changed, 844 insertions(+), 104 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java b/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java
index b3455393e7..f8b9dd03b7 100644
--- a/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java
+++ b/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java
@@ -71,7 +71,7 @@ public class GuavaUtils
 
   /**
    * If first argument is not null, return it, else return the other argument. Sort of like
-   * {@link com.google.common.base.Objects#firstNonNull(Object, Object)} except will not explode if both arguments are
+   * {@link com.google.common.base.Objects#firstNonNull(T, T)} except will not explode if both arguments are
    * null.
    */
   @Nullable
@@ -85,7 +85,8 @@ public class GuavaUtils
 
   /**
    * Cancel futures manually, because sometime we can't cancel all futures in {@link com.google.common.util.concurrent.Futures.CombinedFuture}
-   * automatically. Especially when we call {@link  com.google.common.util.concurrent.Futures#allAsList(Iterable)} to create a batch of
+   * automatically. Especially when we call
+   * {@link  static <V> ListenableFuture<List<V>> com.google.common.util.concurrent.Futures#allAsList(Iterable<? extends ListenableFuture <? extends V>> futures)} to create a batch of
    * future.
    * @param mayInterruptIfRunning {@code true} if the thread executing this
    * task should be interrupted; otherwise, in-progress tasks are allowed
diff --git a/core/src/main/java/org/apache/druid/data/input/InputEntity.java b/core/src/main/java/org/apache/druid/data/input/InputEntity.java
index 343e245e22..6765ae8271 100644
--- a/core/src/main/java/org/apache/druid/data/input/InputEntity.java
+++ b/core/src/main/java/org/apache/druid/data/input/InputEntity.java
@@ -136,4 +136,5 @@ public interface InputEntity
   {
     return Predicates.alwaysFalse();
   }
+
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 66e0c16a70..067f75ec7d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -69,6 +69,7 @@ import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.JodaUtils;
 import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.RE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularity;
@@ -111,6 +112,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -550,6 +552,7 @@ public class CompactionTask extends AbstractBatchIndexTask
         segmentProvider,
         lockGranularityInUse
     );
+
     final Map<DataSegment, File> segmentFileMap = pair.lhs;
     final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = pair.rhs;
 
@@ -557,9 +560,10 @@ public class CompactionTask extends AbstractBatchIndexTask
       return Collections.emptyList();
     }
 
-    // find metadata for interval
+    // find metadata for intervals with real data segments
     // queryableIndexAndSegments is sorted by the interval of the dataSegment
-    final List<NonnullPair<QueryableIndex, DataSegment>> queryableIndexAndSegments = loadSegments(
+    // Note that this list will contain null QueriableIndex values for tombstones
+    final List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments = loadSegments(
         timelineSegments,
         segmentFileMap,
         toolbox.getIndexIO()
@@ -568,8 +572,10 @@ public class CompactionTask extends AbstractBatchIndexTask
     final CompactionTuningConfig compactionTuningConfig = partitionConfigurationManager.computeTuningConfig();
 
     if (granularitySpec == null || granularitySpec.getSegmentGranularity() == null) {
+      final List<ParallelIndexIngestionSpec> specs = new ArrayList<>();
+
       // original granularity
-      final Map<Interval, List<NonnullPair<QueryableIndex, DataSegment>>> intervalToSegments = new TreeMap<>(
+      final Map<Interval, List<Pair<QueryableIndex, DataSegment>>> intervalToSegments = new TreeMap<>(
           Comparators.intervalsByStartThenEnd()
       );
       queryableIndexAndSegments.forEach(
@@ -578,11 +584,11 @@ public class CompactionTask extends AbstractBatchIndexTask
       );
 
       // unify overlapping intervals to ensure overlapping segments compacting in the same indexSpec
-      List<NonnullPair<Interval, List<NonnullPair<QueryableIndex, DataSegment>>>> intervalToSegmentsUnified =
+      List<NonnullPair<Interval, List<Pair<QueryableIndex, DataSegment>>>> intervalToSegmentsUnified =
           new ArrayList<>();
       Interval union = null;
-      List<NonnullPair<QueryableIndex, DataSegment>> segments = new ArrayList<>();
-      for (Entry<Interval, List<NonnullPair<QueryableIndex, DataSegment>>> entry : intervalToSegments.entrySet()) {
+      List<Pair<QueryableIndex, DataSegment>> segments = new ArrayList<>();
+      for (Entry<Interval, List<Pair<QueryableIndex, DataSegment>>> entry : intervalToSegments.entrySet()) {
         Interval cur = entry.getKey();
         if (union == null) {
           union = cur;
@@ -596,12 +602,12 @@ public class CompactionTask extends AbstractBatchIndexTask
           segments = new ArrayList<>(entry.getValue());
         }
       }
+
       intervalToSegmentsUnified.add(new NonnullPair<>(union, segments));
 
-      final List<ParallelIndexIngestionSpec> specs = new ArrayList<>(intervalToSegmentsUnified.size());
-      for (NonnullPair<Interval, List<NonnullPair<QueryableIndex, DataSegment>>> entry : intervalToSegmentsUnified) {
+      for (NonnullPair<Interval, List<Pair<QueryableIndex, DataSegment>>> entry : intervalToSegmentsUnified) {
         final Interval interval = entry.lhs;
-        final List<NonnullPair<QueryableIndex, DataSegment>> segmentsToCompact = entry.rhs;
+        final List<Pair<QueryableIndex, DataSegment>> segmentsToCompact = entry.rhs;
         // If granularitySpec is not null, then set segmentGranularity. Otherwise,
         // creates new granularitySpec and set segmentGranularity
         Granularity segmentGranularityToUse = GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity();
@@ -700,22 +706,19 @@ public class CompactionTask extends AbstractBatchIndexTask
       LockGranularity lockGranularityInUse
   ) throws IOException, SegmentLoadingException
   {
-    final List<DataSegment> usedSegmentsMinusTombstones =
-        segmentProvider.findSegments(toolbox.getTaskActionClient())
-                       .stream()
-                       .filter(dataSegment -> !dataSegment.isTombstone()) // skip tombstones
-                       .collect(Collectors.toList());
-    segmentProvider.checkSegments(lockGranularityInUse, usedSegmentsMinusTombstones);
-    final Map<DataSegment, File> segmentFileMap = toolbox.fetchSegments(usedSegmentsMinusTombstones);
+    final List<DataSegment> usedSegments =
+        segmentProvider.findSegments(toolbox.getTaskActionClient());
+    segmentProvider.checkSegments(lockGranularityInUse, usedSegments);
+    final Map<DataSegment, File> segmentFileMap = toolbox.fetchSegments(usedSegments);
     final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = VersionedIntervalTimeline
-        .forSegments(usedSegmentsMinusTombstones)
+        .forSegments(usedSegments)
         .lookup(segmentProvider.interval);
     return new NonnullPair<>(segmentFileMap, timelineSegments);
   }
 
   private static DataSchema createDataSchema(
       String dataSource,
-      List<NonnullPair<QueryableIndex, DataSegment>> queryableIndexAndSegments,
+      List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments,
       @Nullable DimensionsSpec dimensionsSpec,
       @Nullable ClientCompactionTaskTransformSpec transformSpec,
       @Nullable AggregatorFactory[] metricsSpec,
@@ -781,34 +784,36 @@ public class CompactionTask extends AbstractBatchIndexTask
   private static void decideRollupAndQueryGranularityCarryOver(
       SettableSupplier<Boolean> rollup,
       SettableSupplier<Granularity> queryGranularity,
-      List<NonnullPair<QueryableIndex, DataSegment>> queryableIndexAndSegments
+      List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments
   )
   {
     final SettableSupplier<Boolean> rollupIsValid = new SettableSupplier<>(true);
-    for (NonnullPair<QueryableIndex, DataSegment> pair : queryableIndexAndSegments) {
+    for (Pair<QueryableIndex, DataSegment> pair : queryableIndexAndSegments) {
       final QueryableIndex index = pair.lhs;
-      if (index.getMetadata() == null) {
-        throw new RE("Index metadata doesn't exist for segment[%s]", pair.rhs.getId());
-      }
-      // carry-overs (i.e. query granularity & rollup) are valid iff they are the same in every segment:
-
-      // Pick rollup value if all segments being compacted have the same, non-null, value otherwise set it to false
-      if (rollupIsValid.get()) {
-        Boolean isRollup = index.getMetadata().isRollup();
-        if (isRollup == null) {
-          rollupIsValid.set(false);
-          rollup.set(false);
-        } else if (rollup.get() == null) {
-          rollup.set(isRollup);
-        } else if (!rollup.get().equals(isRollup.booleanValue())) {
-          rollupIsValid.set(false);
-          rollup.set(false);
+      if (index != null) { // avoid tombstones
+        if (index.getMetadata() == null) {
+          throw new RE("Index metadata doesn't exist for segment[%s]", pair.rhs.getId());
+        }
+        // carry-overs (i.e. query granularity & rollup) are valid iff they are the same in every segment:
+
+        // Pick rollup value if all segments being compacted have the same, non-null, value otherwise set it to false
+        if (rollupIsValid.get()) {
+          Boolean isRollup = index.getMetadata().isRollup();
+          if (isRollup == null) {
+            rollupIsValid.set(false);
+            rollup.set(false);
+          } else if (rollup.get() == null) {
+            rollup.set(isRollup);
+          } else if (!rollup.get().equals(isRollup.booleanValue())) {
+            rollupIsValid.set(false);
+            rollup.set(false);
+          }
         }
-      }
 
-      // Pick the finer, non-null, of the query granularities of the segments being compacted
-      Granularity current = index.getMetadata().getQueryGranularity();
-      queryGranularity.set(compareWithCurrent(queryGranularity.get(), current));
+        // Pick the finer, non-null, of the query granularities of the segments being compacted
+        Granularity current = index.getMetadata().getQueryGranularity();
+        queryGranularity.set(compareWithCurrent(queryGranularity.get(), current));
+      }
     }
   }
 
@@ -828,22 +833,28 @@ public class CompactionTask extends AbstractBatchIndexTask
   }
 
   private static AggregatorFactory[] createMetricsSpec(
-      List<NonnullPair<QueryableIndex, DataSegment>> queryableIndexAndSegments
+      List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments
   )
   {
     final List<AggregatorFactory[]> aggregatorFactories = queryableIndexAndSegments
         .stream()
+        .filter(pair -> pair.lhs != null) // avoid tombstones
         .map(pair -> pair.lhs.getMetadata().getAggregators()) // We have already done null check on index.getMetadata()
         .collect(Collectors.toList());
     final AggregatorFactory[] mergedAggregators = AggregatorFactory.mergeAggregators(aggregatorFactories);
 
     if (mergedAggregators == null) {
-      throw new ISE("Failed to merge aggregators[%s]", aggregatorFactories);
+      Optional<Pair<QueryableIndex, DataSegment>> pair =
+          queryableIndexAndSegments.stream().filter(p -> !p.rhs.isTombstone()).findFirst();
+      if (pair.isPresent()) {
+        // this means that there are true data segments, so something went wrong
+        throw new ISE("Failed to merge aggregators[%s]", aggregatorFactories);
+      }
     }
     return mergedAggregators;
   }
 
-  private static DimensionsSpec createDimensionsSpec(List<NonnullPair<QueryableIndex, DataSegment>> queryableIndices)
+  private static DimensionsSpec createDimensionsSpec(List<Pair<QueryableIndex, DataSegment>> queryableIndices)
   {
     final BiMap<String, Integer> uniqueDims = HashBiMap.create();
     final Map<String, DimensionSchema> dimensionSchemaMap = new HashMap<>();
@@ -859,33 +870,35 @@ public class CompactionTask extends AbstractBatchIndexTask
     );
 
     int index = 0;
-    for (NonnullPair<QueryableIndex, DataSegment> pair : Lists.reverse(queryableIndices)) {
+    for (Pair<QueryableIndex, DataSegment> pair : Lists.reverse(queryableIndices)) {
       final QueryableIndex queryableIndex = pair.lhs;
-      final Map<String, DimensionHandler> dimensionHandlerMap = queryableIndex.getDimensionHandlers();
+      if (queryableIndex != null) { // avoid tombstones
+        final Map<String, DimensionHandler> dimensionHandlerMap = queryableIndex.getDimensionHandlers();
 
-      for (String dimension : queryableIndex.getAvailableDimensions()) {
-        final ColumnHolder columnHolder = Preconditions.checkNotNull(
-            queryableIndex.getColumnHolder(dimension),
-            "Cannot find column for dimension[%s]",
-            dimension
-        );
-
-        if (!uniqueDims.containsKey(dimension)) {
-          final DimensionHandler dimensionHandler = Preconditions.checkNotNull(
-              dimensionHandlerMap.get(dimension),
-              "Cannot find dimensionHandler for dimension[%s]",
+        for (String dimension : queryableIndex.getAvailableDimensions()) {
+          final ColumnHolder columnHolder = Preconditions.checkNotNull(
+              queryableIndex.getColumnHolder(dimension),
+              "Cannot find column for dimension[%s]",
               dimension
           );
 
-          uniqueDims.put(dimension, index++);
-          dimensionSchemaMap.put(
-              dimension,
-              createDimensionSchema(
-                  dimension,
-                  columnHolder.getCapabilities(),
-                  dimensionHandler.getMultivalueHandling()
-              )
-          );
+          if (!uniqueDims.containsKey(dimension)) {
+            final DimensionHandler dimensionHandler = Preconditions.checkNotNull(
+                dimensionHandlerMap.get(dimension),
+                "Cannot find dimensionHandler for dimension[%s]",
+                dimension
+            );
+
+            uniqueDims.put(dimension, index++);
+            dimensionSchemaMap.put(
+                dimension,
+                createDimensionSchema(
+                    dimension,
+                    columnHolder.getCapabilities(),
+                    dimensionHandler.getMultivalueHandling()
+                )
+            );
+          }
         }
       }
     }
@@ -905,25 +918,33 @@ public class CompactionTask extends AbstractBatchIndexTask
     return new DimensionsSpec(dimensionSchemas);
   }
 
-  private static List<NonnullPair<QueryableIndex, DataSegment>> loadSegments(
+  /**
+   * This private method does not load, does not create QueryableIndices, for tombstones since tombstones
+   * do not have a file image, they are never pushed to deep storage. Thus, for the case of a tombstone,
+   * The return list
+   * will contain a null for the QueryableIndex slot in the pair (lhs)
+   */
+  private static List<Pair<QueryableIndex, DataSegment>> loadSegments(
       List<TimelineObjectHolder<String, DataSegment>> timelineObjectHolders,
       Map<DataSegment, File> segmentFileMap,
       IndexIO indexIO
   ) throws IOException
   {
-    final List<NonnullPair<QueryableIndex, DataSegment>> segments = new ArrayList<>();
+    final List<Pair<QueryableIndex, DataSegment>> segments = new ArrayList<>();
 
     for (TimelineObjectHolder<String, DataSegment> timelineObjectHolder : timelineObjectHolders) {
       final PartitionHolder<DataSegment> partitionHolder = timelineObjectHolder.getObject();
       for (PartitionChunk<DataSegment> chunk : partitionHolder) {
+        QueryableIndex queryableIndex = null;
         final DataSegment segment = chunk.getObject();
-        final QueryableIndex queryableIndex = indexIO.loadIndex(
-            Preconditions.checkNotNull(segmentFileMap.get(segment), "File for segment %s", segment.getId())
-        );
-        segments.add(new NonnullPair<>(queryableIndex, segment));
+        if (!chunk.getObject().isTombstone()) {
+          queryableIndex = indexIO.loadIndex(
+              Preconditions.checkNotNull(segmentFileMap.get(segment), "File for segment %s", segment.getId())
+          );
+        }
+        segments.add(new Pair<>(queryableIndex, segment));
       }
     }
-
     return segments;
   }
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
index 6f1a6d6446..d10f3d3e9f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
@@ -237,7 +237,6 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
           //noinspection ConstantConditions
           return FluentIterable
               .from(partitionHolder)
-              .filter(chunk -> !chunk.getObject().isTombstone())
               .transform(chunk -> new DruidSegmentInputEntity(segmentCacheManager, chunk.getObject(), holder.getInterval()));
         }).iterator();
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java
index 63f2fe25c5..3396195a21 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java
@@ -91,4 +91,10 @@ public class DruidSegmentInputEntity implements InputEntity
       }
     };
   }
+
+  public boolean isFromTombstone()
+  {
+    return segment.isTombstone();
+  }
+
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputFormat.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputFormat.java
index 4d028596ff..732288e854 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputFormat.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputFormat.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.input;
 
+import com.google.common.base.Preconditions;
 import org.apache.druid.data.input.InputEntity;
 import org.apache.druid.data.input.InputEntityReader;
 import org.apache.druid.data.input.InputFormat;
@@ -55,14 +56,28 @@ public class DruidSegmentInputFormat implements InputFormat
       File temporaryDirectory
   )
   {
-    return new DruidSegmentReader(
-        source,
-        indexIO,
-        inputRowSchema.getTimestampSpec(),
-        inputRowSchema.getDimensionsSpec(),
-        inputRowSchema.getColumnsFilter(),
-        dimFilter,
-        temporaryDirectory
+    // this method handles the case when the entity comes from a tombstone or from a regular segment
+    Preconditions.checkArgument(
+        source instanceof DruidSegmentInputEntity,
+        DruidSegmentInputEntity.class.getName() + " required, but "
+        + source.getClass().getName() + " provided."
     );
+
+    final InputEntityReader retVal;
+    // Cast is safe here because of the precondition above passed
+    if (((DruidSegmentInputEntity) source).isFromTombstone()) {
+      retVal = new DruidTombstoneSegmentReader(source);
+    } else {
+      retVal = new DruidSegmentReader(
+          source,
+          indexIO,
+          inputRowSchema.getTimestampSpec(),
+          inputRowSchema.getDimensionsSpec(),
+          inputRowSchema.getColumnsFilter(),
+          dimFilter,
+          temporaryDirectory
+      );
+    }
+    return retVal;
   }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java
index 7117eea0a9..87181b8cb4 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java
@@ -20,7 +20,6 @@
 package org.apache.druid.indexing.input;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
@@ -72,7 +71,7 @@ import java.util.Set;
 
 public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String, Object>>
 {
-  private final DruidSegmentInputEntity source;
+  private DruidSegmentInputEntity source;
   private final IndexIO indexIO;
   private final ColumnsFilter columnsFilter;
   private final InputRowSchema inputRowSchema;
@@ -89,7 +88,6 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
       final File temporaryDirectory
   )
   {
-    Preconditions.checkArgument(source instanceof DruidSegmentInputEntity);
     this.source = (DruidSegmentInputEntity) source;
     this.indexIO = indexIO;
     this.columnsFilter = columnsFilter;
@@ -105,7 +103,7 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
   @Override
   protected CloseableIterator<Map<String, Object>> intermediateRowIterator() throws IOException
   {
-    final CleanableFile segmentFile = source.fetch(temporaryDirectory, null);
+    final CleanableFile segmentFile = source().fetch(temporaryDirectory, null);
     final WindowedStorageAdapter storageAdapter = new WindowedStorageAdapter(
         new QueryableIndexStorageAdapter(
             indexIO.loadIndex(segmentFile.file())
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidTombstoneSegmentReader.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidTombstoneSegmentReader.java
new file mode 100644
index 0000000000..2a80112df8
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidTombstoneSegmentReader.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.input;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.IntermediateRowParsingReader;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * This class will return an empty iterator since a tombstone has no data rows...
+ */
+public class DruidTombstoneSegmentReader extends IntermediateRowParsingReader<Map<String, Object>>
+{
+  private DruidSegmentInputEntity source;
+
+  public DruidTombstoneSegmentReader(
+      InputEntity source
+  )
+  {
+    this.source = (DruidSegmentInputEntity) source;
+    if (!this.source.isFromTombstone()) {
+      throw new IAE("DruidSegmentInputEntity must be created from a tombstone but is not.");
+    }
+  }
+
+  @Override
+  protected CloseableIterator<Map<String, Object>> intermediateRowIterator()
+  {
+    return new CloseableIterator<Map<String, Object>>()
+    {
+      @Override
+      public void close()
+      {
+
+      }
+
+      @Override
+      public boolean hasNext()
+      {
+        return false;
+      }
+
+      @Override
+      public Map<String, Object> next()
+      {
+        throw new NoSuchElementException();
+      }
+    };
+  }
+
+  @VisibleForTesting
+  @Override
+  protected List<InputRow> parseInputRows(Map<String, Object> intermediateRow)
+  {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+  @Override
+  protected List<Map<String, Object>> toMap(Map<String, Object> intermediateRow)
+  {
+    throw new UnsupportedOperationException(getClass().getName());
+  }
+
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index 1c3463624e..bfd184e137 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -82,6 +82,7 @@ import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.loading.SegmentLoaderConfig;
 import org.apache.druid.segment.loading.SegmentLocalCacheManager;
 import org.apache.druid.segment.loading.StorageLocationConfig;
+import org.apache.druid.segment.loading.TombstoneLoadSpec;
 import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
 import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
 import org.apache.druid.server.security.AuthTestUtils;
@@ -1099,6 +1100,120 @@ public class CompactionTaskRunTest extends IngestionTestBase
     
   }
 
+  @Test
+  public void testCompactDatasourceOverIntervalWithOnlyTombstones() throws Exception
+  {
+    // This test fails with segment lock because of the bug reported in https://github.com/apache/druid/issues/10911.
+    if (lockGranularity == LockGranularity.SEGMENT) {
+      return;
+    }
+
+    // The following task creates (several, more than three, last time I checked, six) HOUR segments with intervals of
+    // - 2014-01-01T00:00:00/2014-01-01T01:00:00
+    // - 2014-01-01T01:00:00/2014-01-01T02:00:00
+    // - 2014-01-01T02:00:00/2014-01-01T03:00:00
+    // The six segments are:
+    // three rows in hour 00:
+    // 2014-01-01T00:00:00.000Z_2014-01-01T01:00:00.000Z with two rows
+    // 2014-01-01T00:00:00.000Z_2014-01-01T01:00:00.000Z_1 with one row
+    // three rows in hour 01:
+    // 2014-01-01T01:00:00.000Z_2014-01-01T02:00:00.000Z with two rows
+    // 2014-01-01T01:00:00.000Z_2014-01-01T02:00:00.000Z_1 with one row
+    // four rows in hour 02:
+    // 2014-01-01T02:00:00.000Z_2014-01-01T03:00:00.000Z with two rows
+    // 2014-01-01T02:00:00.000Z_2014-01-01T03:00:00.000Z_1 with two rows
+    // there are 10 rows total in data set
+
+    // maxRowsPerSegment is set to 2 inside the runIndexTask methods
+    Pair<TaskStatus, List<DataSegment>> result = runIndexTask();
+    Assert.assertEquals(6, result.rhs.size());
+
+    final Builder builder = new Builder(
+        DATA_SOURCE,
+        segmentCacheManagerFactory,
+        RETRY_POLICY_FACTORY
+    );
+
+    // Setup partial interval compaction:
+    // Change the granularity from HOUR to MINUTE through compaction for hour 01, there are three rows in the compaction
+    // interval,
+    // all three in the same timestamp (see TEST_ROWS), this should generate one segment in same, first, minute
+    // (task will now use
+    // the default rows per segments since compaction's tuning config is null) and
+    // 59 tombstones to completely overshadow the existing hour 01 segment. Since the segments outside the
+    // compaction interval should remanin unchanged there should be a total of 1 + (2 + 59) + 2 = 64 segments
+
+    // **** PARTIAL COMPACTION: hour -> minute ****
+    final Interval compactionPartialInterval = Intervals.of("2014-01-01T01:00:00/2014-01-01T02:00:00");
+    final CompactionTask partialCompactionTask = builder
+        .segmentGranularity(Granularities.MINUTE)
+        // Set dropExisting to true
+        .inputSpec(new CompactionIntervalSpec(compactionPartialInterval, null), true)
+        .build();
+    final Pair<TaskStatus, List<DataSegment>> partialCompactionResult = runTask(partialCompactionTask);
+    Assert.assertTrue(partialCompactionResult.lhs.isSuccess());
+
+    // Segments that did not belong in the compaction interval (hours 00 and 02) are expected unchanged
+    // add 2 unchanged segments for hour 00:
+    final Set<DataSegment> expectedSegments = new HashSet<>();
+    expectedSegments.addAll(
+        getStorageCoordinator().retrieveUsedSegmentsForIntervals(
+            DATA_SOURCE,
+            Collections.singletonList(Intervals.of("2014-01-01T00:00:00/2014-01-01T01:00:00")),
+            Segments.ONLY_VISIBLE
+        )
+    );
+    // add 2 unchanged segments for hour 02:
+    expectedSegments.addAll(
+        getStorageCoordinator().retrieveUsedSegmentsForIntervals(
+            DATA_SOURCE,
+            Collections.singletonList(Intervals.of("2014-01-01T02:00:00/2014-01-01T03:00:00")),
+            Segments.ONLY_VISIBLE
+        )
+    );
+    expectedSegments.addAll(partialCompactionResult.rhs);
+    Assert.assertEquals(64, expectedSegments.size());
+
+    // New segments that were compacted are expected. However, old segments of the compacted interval should be
+    // overshadowed by the new tombstones (59) being created for all minutes other than 01:01
+    final Set<DataSegment> segmentsAfterPartialCompaction = new HashSet<>(
+        getStorageCoordinator().retrieveUsedSegmentsForIntervals(
+            DATA_SOURCE,
+            Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")),
+            Segments.ONLY_VISIBLE
+        )
+    );
+    Assert.assertEquals(expectedSegments, segmentsAfterPartialCompaction);
+    final List<DataSegment> realSegmentsAfterPartialCompaction =
+        segmentsAfterPartialCompaction.stream()
+                                      .filter(s -> !s.isTombstone())
+                                      .collect(Collectors.toList());
+    final List<DataSegment> tombstonesAfterPartialCompaction =
+        segmentsAfterPartialCompaction.stream()
+                                      .filter(s -> s.isTombstone())
+                                      .collect(Collectors.toList());
+    Assert.assertEquals(59, tombstonesAfterPartialCompaction.size());
+    Assert.assertEquals(5, realSegmentsAfterPartialCompaction.size());
+    Assert.assertEquals(64, segmentsAfterPartialCompaction.size());
+
+    // Now setup compaction over an interval with only tombstones, keeping same, minute granularity
+    final CompactionTask compactionTaskOverOnlyTombstones = builder
+        .segmentGranularity(null)
+        // Set dropExisting to true
+        // last 59 minutes of our 01, should be all tombstones
+        .inputSpec(new CompactionIntervalSpec(Intervals.of("2014-01-01T01:01:00/2014-01-01T02:00:00"), null), true)
+        .build();
+
+    // **** Compaction over tombstones ****
+    final Pair<TaskStatus, List<DataSegment>> resultOverOnlyTombstones = runTask(compactionTaskOverOnlyTombstones);
+    Assert.assertTrue(resultOverOnlyTombstones.lhs.isSuccess());
+
+    // compaction should not fail but since it is over the same granularity it should leave
+    // the tombstones unchanged
+    Assert.assertEquals(59, resultOverOnlyTombstones.rhs.size());
+    resultOverOnlyTombstones.rhs.forEach(t -> Assert.assertTrue(t.isTombstone()));
+  }
+
   @Test
   public void testPartialIntervalCompactWithFinerSegmentGranularityThenFullIntervalCompactWithDropExistingFalse() throws Exception
   {
@@ -1454,6 +1569,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
     final ObjectMapper objectMapper = getObjectMapper();
     objectMapper.registerSubtypes(new NamedType(LocalLoadSpec.class, "local"));
     objectMapper.registerSubtypes(LocalDataSegmentPuller.class);
+    objectMapper.registerSubtypes(TombstoneLoadSpec.class);
 
     final TaskToolbox box = createTaskToolbox(objectMapper, task);
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionCardinalityReportTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionCardinalityReportTest.java
index 29bbe5c97f..876458d917 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionCardinalityReportTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionCardinalityReportTest.java
@@ -179,4 +179,118 @@ public class DimensionCardinalityReportTest
         intervalToNumShards
     );
   }
+
+
+  @Test
+  public void testLargeSupervisorDetermineNumShardsFromCardinalityReport()
+  {
+    List<DimensionCardinalityReport> reports = new ArrayList<>();
+
+    HllSketch collector1 = DimensionCardinalityReport.createHllSketchForReport();
+    collector1.update(IndexTask.HASH_FUNCTION.hashLong(1L).asBytes());
+    collector1.update(IndexTask.HASH_FUNCTION.hashLong(200L).asBytes());
+    DimensionCardinalityReport report1 = new DimensionCardinalityReport(
+        "taskA",
+        ImmutableMap.of(
+            Intervals.of("1970-01-01T00:00:00.000Z/1970-01-02T00:00:00.000Z"),
+            collector1.toCompactByteArray()
+        )
+    );
+    reports.add(report1);
+
+    HllSketch collector2 = DimensionCardinalityReport.createHllSketchForReport();
+    collector2.update(IndexTask.HASH_FUNCTION.hashLong(1000L).asBytes());
+    collector2.update(IndexTask.HASH_FUNCTION.hashLong(30000L).asBytes());
+    DimensionCardinalityReport report2 = new DimensionCardinalityReport(
+        "taskB",
+        ImmutableMap.of(
+            Intervals.of("1970-01-01T00:00:00.000Z/1970-01-02T00:00:00.000Z"),
+            collector2.toCompactByteArray()
+        )
+    );
+    reports.add(report2);
+
+    // Separate interval with only 1 value
+    HllSketch collector3 = DimensionCardinalityReport.createHllSketchForReport();
+    collector3.update(IndexTask.HASH_FUNCTION.hashLong(99000L).asBytes());
+    DimensionCardinalityReport report3 = new DimensionCardinalityReport(
+        "taskC",
+        ImmutableMap.of(
+            Intervals.of("1970-01-02T00:00:00.000Z/1970-01-03T00:00:00.000Z"),
+            collector3.toCompactByteArray()
+        )
+    );
+    reports.add(report3);
+
+    // first interval in test has cardinality 4
+    Map<Interval, Integer> intervalToNumShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
+        reports,
+        1
+    );
+    Assert.assertEquals(
+        ImmutableMap.of(
+            Intervals.of("1970-01-01/P1D"),
+            4,
+            Intervals.of("1970-01-02/P1D"),
+            1
+        ),
+        intervalToNumShards
+    );
+
+    intervalToNumShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
+        reports,
+        2
+    );
+    Assert.assertEquals(
+        ImmutableMap.of(
+            Intervals.of("1970-01-01/P1D"),
+            2,
+            Intervals.of("1970-01-02/P1D"),
+            1
+        ),
+        intervalToNumShards
+    );
+
+    intervalToNumShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
+        reports,
+        3
+    );
+    Assert.assertEquals(
+        ImmutableMap.of(
+            Intervals.of("1970-01-01/P1D"),
+            1,
+            Intervals.of("1970-01-02/P1D"),
+            1
+        ),
+        intervalToNumShards
+    );
+
+    intervalToNumShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
+        reports,
+        4
+    );
+    Assert.assertEquals(
+        ImmutableMap.of(
+            Intervals.of("1970-01-01/P1D"),
+            1,
+            Intervals.of("1970-01-02/P1D"),
+            1
+        ),
+        intervalToNumShards
+    );
+
+    intervalToNumShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
+        reports,
+        5
+    );
+    Assert.assertEquals(
+        ImmutableMap.of(
+            Intervals.of("1970-01-01/P1D"),
+            1,
+            Intervals.of("1970-01-02/P1D"),
+            1
+        ),
+        intervalToNumShards
+    );
+  }
 }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentInputFormatTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentInputFormatTest.java
new file mode 100644
index 0000000000..910371a566
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentInputFormatTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.input;
+
+import org.apache.druid.data.input.ColumnsFilter;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.FileEntity;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.java.util.common.Intervals;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertThrows;
+
+
+public class DruidSegmentInputFormatTest
+{
+
+  private static final InputRowSchema INPUT_ROW_SCHEMA = new InputRowSchema(
+      new TimestampSpec("ts", "auto", null),
+      new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "name"))),
+      ColumnsFilter.all()
+  );
+
+
+  @Test
+  public void testDruidSegmentInputEntityReader()
+  {
+    DruidSegmentInputFormat format = new DruidSegmentInputFormat(null, null);
+    InputEntityReader reader = format.createReader(
+        INPUT_ROW_SCHEMA,
+        DruidSegmentReaderTest.makeInputEntity(Intervals.of("2000/P1D"), null),
+        null
+    );
+    Assert.assertTrue(reader instanceof DruidSegmentReader);
+  }
+
+
+  @Test
+  public void testDruidTombstoneSegmentInputEntityReader()
+  {
+    DruidSegmentInputFormat format = new DruidSegmentInputFormat(null, null);
+    InputEntityReader reader = format.createReader(
+        INPUT_ROW_SCHEMA,
+        DruidSegmentReaderTest.makeTombstoneInputEntity(Intervals.of("2000/P1D")),
+        null
+    );
+    Assert.assertTrue(reader instanceof DruidTombstoneSegmentReader);
+  }
+
+  @Test
+  public void testDruidSegmentInputEntityReaderBadEntity()
+  {
+    DruidSegmentInputFormat format = new DruidSegmentInputFormat(null, null);
+    Exception exception = assertThrows(IllegalArgumentException.class, () -> {
+      format.createReader(
+          INPUT_ROW_SCHEMA,
+          new FileEntity(null),
+          null
+      );
+    });
+    String expectedMessage =
+        "org.apache.druid.indexing.input.DruidSegmentInputEntity required, but org.apache.druid.data.input.impl.FileEntity provided.";
+    String actualMessage = exception.getMessage();
+    Assert.assertEquals(expectedMessage, actualMessage);
+  }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java
index 9301b891e4..1638e79e9a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java
@@ -30,6 +30,7 @@ import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.MapBasedInputRow;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.DoubleDimensionSchema;
+import org.apache.druid.data.input.impl.FileEntity;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.hll.HyperLogLogCollector;
@@ -52,6 +53,7 @@ import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.loading.SegmentCacheManager;
 import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.TombstoneShardSpec;
 import org.joda.time.Interval;
 import org.junit.Assert;
 import org.junit.Before;
@@ -68,6 +70,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
+import static org.junit.Assert.assertThrows;
+
 public class DruidSegmentReaderTest extends NullHandlingTest
 {
   @Rule
@@ -186,6 +190,42 @@ public class DruidSegmentReaderTest extends NullHandlingTest
     );
   }
 
+  @Test
+  public void testDruidTombstoneSegmentReader() throws IOException
+  {
+    final DruidTombstoneSegmentReader reader = new DruidTombstoneSegmentReader(
+        makeTombstoneInputEntity(Intervals.of("2000/P1D"))
+    );
+
+    Assert.assertFalse(reader.intermediateRowIterator().hasNext());
+    Assert.assertEquals(
+        Collections.emptyList(),
+        readRows(reader)
+    );
+  }
+
+  @Test
+  public void testDruidTombstoneSegmentReaderBadEntity()
+  {
+    assertThrows(ClassCastException.class, () -> {
+      new DruidTombstoneSegmentReader(
+          new FileEntity(null));
+    });
+  }
+
+  @Test
+  public void testDruidTombstoneSegmentReaderNotCreatedFromTombstone()
+  {
+    Exception exception = assertThrows(IllegalArgumentException.class, () -> {
+      new DruidTombstoneSegmentReader(makeInputEntity(Intervals.of("2000/P1D")));
+    });
+    String expectedMessage =
+        "DruidSegmentInputEntity must be created from a tombstone but is not.";
+    String actualMessage = exception.getMessage();
+    Assert.assertEquals(expectedMessage, actualMessage);
+
+  }
+
   @Test
   public void testReaderAutoTimestampFormat() throws IOException
   {
@@ -582,6 +622,11 @@ public class DruidSegmentReaderTest extends NullHandlingTest
   }
 
   private DruidSegmentInputEntity makeInputEntity(final Interval interval)
+  {
+    return makeInputEntity(interval, segmentDirectory);
+  }
+
+  public static DruidSegmentInputEntity makeInputEntity(final Interval interval, final File segmentDirectory)
   {
     return new DruidSegmentInputEntity(
         new SegmentCacheManager()
@@ -634,7 +679,62 @@ public class DruidSegmentReaderTest extends NullHandlingTest
     );
   }
 
-  private List<InputRow> readRows(final DruidSegmentReader reader) throws IOException
+  public static DruidSegmentInputEntity makeTombstoneInputEntity(final Interval interval)
+  {
+    return new DruidSegmentInputEntity(
+        new SegmentCacheManager()
+        {
+          @Override
+          public boolean isSegmentCached(DataSegment segment)
+          {
+            throw new UnsupportedOperationException("unused");
+          }
+
+          @Override
+          public File getSegmentFiles(DataSegment segment)
+          {
+            throw new UnsupportedOperationException("unused");
+          }
+
+          @Override
+          public void cleanup(DataSegment segment)
+          {
+            throw new UnsupportedOperationException("unused");
+          }
+
+          @Override
+          public boolean reserve(DataSegment segment)
+          {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public boolean release(DataSegment segment)
+          {
+            throw new UnsupportedOperationException();
+          }
+
+          @Override
+          public void loadSegmentIntoPageCache(DataSegment segment, ExecutorService exec)
+          {
+            throw new UnsupportedOperationException();
+
+          }
+        },
+        DataSegment.builder()
+                   .dataSource("ds")
+                   .interval(Intervals.of("2000/P1D"))
+                   .version("1")
+                   .shardSpec(new TombstoneShardSpec())
+                   .loadSpec(ImmutableMap.of("type", "tombstone"))
+                   .size(1)
+                   .build(),
+        interval
+    );
+  }
+
+
+  private List<InputRow> readRows(DruidSegmentReader reader) throws IOException
   {
     final List<InputRow> rows = new ArrayList<>();
     try (final CloseableIterator<Map<String, Object>> iterator = reader.intermediateRowIterator()) {
@@ -645,6 +745,18 @@ public class DruidSegmentReaderTest extends NullHandlingTest
     return rows;
   }
 
+  private List<InputRow> readRows(DruidTombstoneSegmentReader reader) throws IOException
+  {
+    final List<InputRow> rows = new ArrayList<>();
+    try (final CloseableIterator<Map<String, Object>> iterator = reader.intermediateRowIterator()) {
+      while (iterator.hasNext()) {
+        rows.addAll(reader.parseInputRows(iterator.next()));
+      }
+    }
+    return rows;
+  }
+
+
   private static HyperLogLogCollector makeHLLC(final String... values)
   {
     final HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
index 07a9bd62de..7dd7591615 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
@@ -40,6 +40,7 @@ import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
@@ -67,6 +68,7 @@ import org.apache.druid.tests.TestNGGroup;
 import org.apache.druid.tests.indexer.AbstractITBatchIndexTest;
 import org.apache.druid.tests.indexer.AbstractIndexerTest;
 import org.apache.druid.timeline.DataSegment;
+import org.joda.time.DateTimeZone;
 import org.joda.time.Interval;
 import org.joda.time.Period;
 import org.joda.time.chrono.ISOChronology;
@@ -80,6 +82,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -502,10 +505,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
           1,
           1,
           0);
-      Assert.assertEquals(
-          "14906",
-          compactionResource.getCompactionProgress(fullDatasourceName).get("remainingSegmentSize")
-      );
+      Assert.assertEquals(compactionResource.getCompactionProgress(fullDatasourceName).get("remainingSegmentSize"), "14906");
       // Run compaction again to compact the remaining day
       // Remaining day compacted (1 new segment). Now both days compacted (2 total)
       forceTriggerAutoCompaction(2);
@@ -530,7 +530,28 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
   @Test
   public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingTrue() throws Exception
   {
+    // Interval is "2013-08-31/2013-09-02", segment gran is DAY,
+    // "maxRowsPerSegment": 3
+    // input files:
+    //    "/resources/data/batch_index/json/wikipedia_index_data1.json",
+    //        3rows -> "2013-08-31T01:02:33Z", "2013-08-31T03:32:45Z", "2013-08-31T07:11:21Z"
+    //     "/resources/data/batch_index/json/wikipedia_index_data2.json",
+    //       3 rows -> "2013-08-31T11:58:39Z", "2013-08-31T12:41:27Z", "2013-09-01T01:02:33Z"
+    //     "/resources/data/batch_index/json/wikipedia_index_data3.json"
+    //       4 rows -> "2013-09-01T03:32:45Z", "2013-09-01T07:11:21Z", "2013-09-01T11:58:39Z", "2013-09-01T12:41:27Z"
+    //      Summary of data:
+    //       5 rows @ 2013-08031 and 5 at 2013-0901, TWO days have data only
+    //      Initial load/ingestion: DAY, "intervals" : [ "2013-08-31/2013-09-02" ], Four segments, no tombstones
+    //      1st compaction: YEAR: 10 rows during 2013 (4 segments of at most three rows each)
+    //              "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z",
+    //      2nd compaction: MONTH: 5 rows @ 2013-08 (two segments), 5 rows @ 2013-09 (two segments)
+    //              "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z",
+    //                             Four data segments (two months) and 10 tombstones for remaining months
+    //      3d compaction: SEMESTER:  5 rows @ 2013-08-31 (two segments), 5 rows @ 2013-09-01 (two segments),
+    //               2 compactions were generated for year 2013; one for each semester to be compacted of the whole year.
+    //               
     loadData(INDEX_TASK);
+
     try (final Closeable ignored = unloader(fullDatasourceName)) {
       final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
       intervalsBeforeCompaction.sort(null);
@@ -538,12 +559,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
       verifySegmentsCount(4);
       verifyQuery(INDEX_QUERIES_RESOURCE);
 
+
+      LOG.info("Auto compaction test with YEAR segment granularity, dropExisting is true");
       Granularity newGranularity = Granularities.YEAR;
       // Set dropExisting to true
+      // "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z",
       submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true);
 
-      LOG.info("Auto compaction test with YEAR segment granularity");
-
       List<String> expectedIntervalAfterCompaction = new ArrayList<>();
       for (String interval : intervalsBeforeCompaction) {
         for (Interval newinterval : newGranularity.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) {
@@ -555,30 +577,169 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
       verifySegmentsCompacted(1, 1000);
       checkCompactionIntervals(expectedIntervalAfterCompaction);
 
-      newGranularity = Granularities.DAY;
+
+      LOG.info("Auto compaction test with MONTH segment granularity, dropExisting is true");
+      //  "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z",
+      newGranularity = Granularities.MONTH;
       // Set dropExisting to true
       submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true);
 
-      LOG.info("Auto compaction test with DAY segment granularity");
+      // Since dropExisting is set to true...
+      // Again data is only in two days
+      // The earlier segment with YEAR granularity will be completely covered, overshadowed, by the
+      // new MONTH segments for data and tombstones for days with no data
+      // Hence, we will only have 2013-08 to 2013-09 months with data
+      // plus 12 tombstones
+      final List<String> intervalsAfterYEARCompactionButBeforeMONTHCompaction =
+          coordinator.getSegmentIntervals(fullDatasourceName);
+      expectedIntervalAfterCompaction = new ArrayList<>();
+      for (String interval : intervalsAfterYEARCompactionButBeforeMONTHCompaction) {
+        for (Interval newinterval : newGranularity.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) {
+          expectedIntervalAfterCompaction.add(newinterval.toString());
+        }
+      }
+      forceTriggerAutoCompaction(12);
+      verifyQuery(INDEX_QUERIES_RESOURCE);
+      verifyTombstones(10);
+      verifySegmentsCompacted(12, 1000);
+      checkCompactionIntervals(expectedIntervalAfterCompaction);
+
+      LOG.info("Auto compaction test with SEMESTER segment granularity, dropExisting is true, over tombstones");
+      // only reason is semester and not quarter or month is to minimize time in the test but to
+      // ensure that one of the compactions compacts *only* tombstones. The first semester will
+      // compact only tombstones, so it should be a tombstone itself.
+      newGranularity = new PeriodGranularity(new Period("P6M"), null, DateTimeZone.UTC);
+      // Set dropExisting to true
+      submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true);
 
       // Since dropExisting is set to true...
+      // The earlier 12 segments with MONTH granularity will be completely covered, overshadowed, by the
+      // new PT6M segments for data and tombstones for days with no data
+      // Hence, we will have two segments, one tombstone for the first semester and one data segment for the second.
+      forceTriggerAutoCompaction(2); // two semesters compacted
+      verifyQuery(INDEX_QUERIES_RESOURCE);
+      verifyTombstones(1);
+      verifySegmentsCompacted(2, 1000);
+
+      expectedIntervalAfterCompaction =
+          Arrays.asList("2013-01-01T00:00:00.000Z/2013-07-01T00:00:00.000Z",
+                        "2013-07-01T00:00:00.000Z/2014-01-01T00:00:00.000Z"
+          );
+      checkCompactionIntervals(expectedIntervalAfterCompaction);
+
+      // verify that autocompaction completed  before
+      List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
+      forceTriggerAutoCompaction(2);
+      List<TaskResponseObject> compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName);
+      Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size());
+
+    }
+  }
+
+  @Test
+  public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingTrueThenFalse() throws Exception
+  {
+    // Interval is "2013-08-31/2013-09-02", segment gran is DAY,
+    // "maxRowsPerSegment": 3
+    // input files:
+    //    "/resources/data/batch_index/json/wikipedia_index_data1.json",
+    //        3rows -> "2013-08-31T01:02:33Z", "2013-08-31T03:32:45Z", "2013-08-31T07:11:21Z"
+    //     "/resources/data/batch_index/json/wikipedia_index_data2.json",
+    //       3 rows -> "2013-08-31T11:58:39Z", "2013-08-31T12:41:27Z", "2013-09-01T01:02:33Z"
+    //     "/resources/data/batch_index/json/wikipedia_index_data3.json"
+    //       4 rows -> "2013-09-01T03:32:45Z", "2013-09-01T07:11:21Z", "2013-09-01T11:58:39Z", "2013-09-01T12:41:27Z"
+    //      Summary of data:
+    //       5 rows @ 2013-08031 and 5 at 2013-0901, TWO days have data only
+    //      Initial load/ingestion: DAY, "intervals" : [ "2013-08-31/2013-09-02" ], Four segments, no tombstones
+    //      1st compaction: YEAR: 10 rows during 2013 (4 segments of at most three rows each)
+    //              "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z",
+    //      2nd compaction: MONTH: 5 rows @ 2013-08 (two segments), 5 rows @ 2013-09 (two segments)
+    //              "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z",
+    //                             Four data segments (two months) and 10 tombstones for remaining months
+    //      3d compaction: SEMESTER:  5 rows @ 2013-08-31, 5 rows @ 2013-09-01 (two segment),
+    //               2 compactions were generated for year 2013; one for each semester to be compacted of the whole year.
+    //
+    loadData(INDEX_TASK);
+
+    try (final Closeable ignored = unloader(fullDatasourceName)) {
+      final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
+      intervalsBeforeCompaction.sort(null);
+      // 4 segments across 2 days (4 total)...
+      verifySegmentsCount(4);
+      verifyQuery(INDEX_QUERIES_RESOURCE);
+
+
+      LOG.info("Auto compaction test with YEAR segment granularity, dropExisting is true");
+      Granularity newGranularity = Granularities.YEAR;
+      // Set dropExisting to true
+      // "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z",
+      submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true);
+
+      List<String> expectedIntervalAfterCompaction = new ArrayList<>();
+      for (String interval : intervalsBeforeCompaction) {
+        for (Interval newinterval : newGranularity.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) {
+          expectedIntervalAfterCompaction.add(newinterval.toString());
+        }
+      }
+      forceTriggerAutoCompaction(1);
+      verifyQuery(INDEX_QUERIES_RESOURCE);
+      verifySegmentsCompacted(1, 1000);
+      checkCompactionIntervals(expectedIntervalAfterCompaction);
+
+
+      LOG.info("Auto compaction test with MONTH segment granularity, dropExisting is true");
+      //  "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z",
+      newGranularity = Granularities.MONTH;
+      // Set dropExisting to true
+      submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true);
+
+      // Since dropExisting is set to true...
+      // Again data is only in two days
       // The earlier segment with YEAR granularity will be completely covered, overshadowed, by the
-      // new DAY segments for data and tombstones for days with no data
-      // Hence, we will only have 2013-08-31 to 2013-09-01 and 2013-09-01 to 2013-09-02
-      // plus 363 tombstones
-      final List<String> intervalsAfterYEARCompactionButBeforeDAYCompaction =
+      // new MONTH segments for data and tombstones for days with no data
+      // Hence, we will only have 2013-08 to 2013-09 months with data
+      // plus 12 tombstones
+      final List<String> intervalsAfterYEARCompactionButBeforeMONTHCompaction =
           coordinator.getSegmentIntervals(fullDatasourceName);
       expectedIntervalAfterCompaction = new ArrayList<>();
-      for (String interval : intervalsAfterYEARCompactionButBeforeDAYCompaction) {
+      for (String interval : intervalsAfterYEARCompactionButBeforeMONTHCompaction) {
         for (Interval newinterval : newGranularity.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) {
           expectedIntervalAfterCompaction.add(newinterval.toString());
         }
       }
-      forceTriggerAutoCompaction(365);
+      forceTriggerAutoCompaction(12);
+      verifyQuery(INDEX_QUERIES_RESOURCE);
+      verifyTombstones(10);
+      verifySegmentsCompacted(12, 1000);
+      checkCompactionIntervals(expectedIntervalAfterCompaction);
+
+      // Now compact again over tombstones but with dropExisting set to false:
+      LOG.info("Auto compaction test with SEMESTER segment granularity, dropExisting is false, over tombstones");
+      newGranularity = new PeriodGranularity(new Period("P6M"), null, DateTimeZone.UTC);
+      // Set dropExisting to false
+      submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity,
+                                                                                           null, null
+      ), false);
+
+      // Since dropExisting is set to false the first semester will be forced to dropExisting true
+      // Hence, we will have two, one tombstone for the first semester and one data segment for the second.
+      forceTriggerAutoCompaction(2); // two semesters compacted
       verifyQuery(INDEX_QUERIES_RESOURCE);
-      verifyTombstones(363);
-      verifySegmentsCompacted(365, 1000);
+      verifyTombstones(1);
+      verifySegmentsCompacted(2, 1000);
+
+      expectedIntervalAfterCompaction =
+          Arrays.asList(
+              "2013-01-01T00:00:00.000Z/2013-07-01T00:00:00.000Z",
+              "2013-07-01T00:00:00.000Z/2014-01-01T00:00:00.000Z"
+          );
       checkCompactionIntervals(expectedIntervalAfterCompaction);
+
+      // verify that autocompaction completed  before
+      List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
+      forceTriggerAutoCompaction(2);
+      List<TaskResponseObject> compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName);
+      Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size());
     }
   }
 
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index e3f47ddf24..ddae02298b 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -427,6 +427,26 @@ public class CompactSegments implements CoordinatorCustomDuty
           dropExisting = config.getIoConfig().isDropExisting();
         }
 
+        // If all the segments found to be compacted are tombstones then dropExisting
+        // needs to be forced to true. This forcing needs to  happen in the case that
+        // the flag is null, or it is false. It is needed when it is null to avoid the
+        // possibility of the code deciding to default it to false later.
+        // Forcing the flag to true will enable the task ingestion code to generate new, compacted, tombstones to
+        // cover the tombstones found to be compacted as well as to mark them
+        // as compacted (update their lastCompactionState). If we don't force the
+        // flag then every time this compact duty runs it will find the same tombstones
+        // in the interval since their lastCompactionState
+        // was not set repeating this over and over and the duty will not make progress; it
+        // will become stuck on this set of tombstones.
+        // This forcing code should be revised
+        // when/if the autocompaction code policy to decide which segments to compact changes
+        if (dropExisting == null || !dropExisting) {
+          if (segmentsToCompact.stream().allMatch(dataSegment -> dataSegment.isTombstone())) {
+            dropExisting = true;
+            LOG.info("Forcing dropExisting to %s since all segments to compact are tombstones", dropExisting);
+          }
+        }
+
         // make tuningConfig
         final String taskId = indexingServiceClient.compactSegments(
             "coordinator-issued",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org