You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ma...@apache.org on 2021/04/09 07:12:53 UTC

[druid] branch master updated: Make dropExisting flag for Compaction configurable and add warning documentations (#11070)

This is an automated email from the ASF dual-hosted git repository.

maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 4576152  Make dropExisting flag for Compaction configurable and add warning documentations (#11070)
4576152 is described below

commit 4576152e4a0213d17048a330e7089aa9d89f3972
Author: Maytas Monsereenusorn <ma...@apache.org>
AuthorDate: Fri Apr 9 00:12:28 2021 -0700

    Make dropExisting flag for Compaction configurable and add warning documentations (#11070)
    
    * Make dropExisting flag for Compaction configurable
    
    * fix checkstyle
    
    * fix checkstyle
    
    * fix test
    
    * add tests
    
    * fix spelling
    
    * fix docs
    
    * add IT
    
    * fix test
    
    * fix doc
    
    * fix doc
---
 .../NewestSegmentFirstPolicyBenchmark.java         |   1 +
 docs/configuration/index.md                        |  10 ++
 docs/ingestion/compaction.md                       |  12 +-
 docs/ingestion/native-batch.md                     |   7 +-
 .../indexing/common/task/CompactionIOConfig.java   |  20 ++-
 .../druid/indexing/common/task/CompactionTask.java |  29 ++--
 .../druid/indexing/common/task/IndexTask.java      |   1 -
 .../task/ClientCompactionTaskQuerySerdeTest.java   |  12 +-
 .../common/task/CompactionTaskParallelRunTest.java |  46 ++++++-
 .../common/task/CompactionTaskRunTest.java         |  83 ++++++++++-
 .../indexing/common/task/CompactionTaskTest.java   |  89 ++++++++----
 .../coordinator/duty/ITAutoCompactionTest.java     | 151 +++++++++++++++++++--
 .../client/indexing/ClientCompactionIOConfig.java  |  21 ++-
 .../client/indexing/HttpIndexingServiceClient.java |   3 +-
 .../client/indexing/IndexingServiceClient.java     |   1 +
 .../apache/druid/segment/indexing/IOConfig.java    |   1 +
 .../coordinator/DataSourceCompactionConfig.java    |  12 ++
 .../coordinator/UserCompactionTaskIOConfig.java    |  39 +++---
 .../server/coordinator/duty/CompactSegments.java   |   6 +
 .../client/indexing/NoopIndexingServiceClient.java |   1 +
 .../DataSourceCompactionConfigTest.java            |  64 +++++++++
 .../coordinator/duty/CompactSegmentsTest.java      | 115 +++++++++++++++-
 .../duty/NewestSegmentFirstIteratorTest.java       |   9 ++
 .../duty/NewestSegmentFirstPolicyTest.java         |   1 +
 website/.spelling                                  |   1 +
 25 files changed, 644 insertions(+), 91 deletions(-)

diff --git a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
index e744bf9..b93053e 100644
--- a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
@@ -100,6 +100,7 @@ public class NewestSegmentFirstPolicyBenchmark
               null,
               null,
               null,
+              null,
               null
           )
       );
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 30cc5eb..b56f752 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -867,6 +867,7 @@ A description of the compaction config is:
 |`tuningConfig`|Tuning config for compaction tasks. See below [Compaction Task TuningConfig](#automatic-compaction-tuningconfig).|no|
 |`taskContext`|[Task context](../ingestion/tasks.md#context) for compaction tasks.|no|
 |`granularitySpec`|Custom `granularitySpec` to describe the `segmentGranularity` for the compacted segments. See [Automatic compaction granularitySpec](#automatic-compaction-granularityspec)|No|
+|`ioConfig`|IO config for compaction tasks. See below [Compaction Task IOConfig](#automatic-compaction-ioconfig).|no|
 
 An example of compaction config is:
 
@@ -918,6 +919,15 @@ You can optionally use the `granularitySpec` object to configure the segment gra
 
 > Unlike manual compaction, automatic compaction does not support query granularity.
 
+###### Automatic compaction IOConfig
+
+Auto compaction supports a subset of the [IOConfig for Parallel task](../ingestion/native-batch.md).
+The below is a list of the supported configurations for auto compaction.
+
+|Property|Description|Default|Required|
+|--------|-----------|-------|--------|
+|`dropExisting`|If `true`, then the generated compaction task drops (mark unused) all existing segments fully contained by the umbrella interval of the compacted segments when the task publishes new segments. If compaction fails, Druid does not drop or mark unused any segments. WARNING: this functionality is still in beta and can result in temporary data unavailability for data within the compacted `interval`. Note that changing this config does not cause intervals to be compacted again. [...]
+
 ### Overlord
 
 For general Overlord Process information, see [here](../design/overlord.md).
diff --git a/docs/ingestion/compaction.md b/docs/ingestion/compaction.md
index f6846dc..9f34381 100644
--- a/docs/ingestion/compaction.md
+++ b/docs/ingestion/compaction.md
@@ -54,7 +54,7 @@ See [Setting up a manual compaction task](#setting-up-manual-compaction) for mor
 ## Data handling with compaction
 During compaction, Druid overwrites the original set of segments with the compacted set. Druid also locks the segments for the time interval being compacted to ensure data consistency. By default, compaction tasks do not modify the underlying data. You can configure the compaction task to change the query granularity or add or remove dimensions in the compaction task. This means that the only changes to query results should be the result of intentional, not automatic, changes.
 
-For compaction tasks, `dropExisting` for underlying ingestion tasks is "true". This means that Druid can drop (mark unused) all the existing segments fully within interval for the compaction task. For an example of why this is important, see the suggestion for reindexing with finer granularity under [Implementation considerations](native-batch.md#implementation-considerations). 
+For compaction tasks, `dropExisting` in `ioConfig` can be set to "true" for Druid to drop (mark unused) all existing segments fully contained by the interval of the compaction task. For an example of why this is important, see the suggestion for reindexing with finer granularity under [Implementation considerations](native-batch.md#implementation-considerations). WARNING: this functionality is still in beta and can result in temporary data unavailability for data within the compaction ta [...]
 
 If an ingestion task needs to write data to a segment for a time interval locked for compaction, by default the ingestion task supersedes the compaction task and the compaction task fails without finishing. For manual compaction tasks you can adjust the input spec interval to avoid conflicts between ingestion and compaction. For automatic compaction, you can set the `skipOffsetFromLatest` key to adjustment the auto compaction starting point from the current time to reduce the chance of c [...]
 
@@ -158,10 +158,12 @@ This task doesn't specify a `granularitySpec` so Druid retains the original segm
 
 The compaction `ioConfig` requires specifying `inputSpec` as follows:
 
-|Field|Description|Required|
-|-----|-----------|--------|
-|`type`|Task type. Should be `compact`|Yes|
-|`inputSpec`|Input specification|Yes|
+|Field|Description|Default|Required?|
+|-----|-----------|-------|--------|
+|`type`|Task type. Should be `compact`|none|Yes|
+|`inputSpec`|Input specification|none|Yes|
+|`dropExisting`|If `true`, then the compaction task drops (mark unused) all existing segments fully contained by either the `interval` in the `interval` type `inputSpec` or the umbrella interval of the `segments` in the `segment` type `inputSpec` when the task publishes new compacted segments. If compaction fails, Druid does not drop or mark unused any segments. WARNING: this functionality is still in beta and can result in temporary data unavailability for data within the compaction tas [...]
+
 
 There are two supported `inputSpec`s for now.
 
diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md
index 5e7b337..48539bb 100644
--- a/docs/ingestion/native-batch.md
+++ b/docs/ingestion/native-batch.md
@@ -91,7 +91,8 @@ The supported compression formats for native batch ingestion are `bz2`, `gz`, `x
   `granularitySpec`'s intervals, the portion of those segments outside the new segments' intervals will still be visible.
 - You can set `dropExisting` flag in the `ioConfig` to true if you want the ingestion task to drop all existing segments that 
   start and end within your `granularitySpec`'s intervals. This applies whether or not the new data covers all existing segments. 
-  `dropExisting` only applies when `appendToExisting` is false and the  `granularitySpec` contains an `interval`. 
+  `dropExisting` only applies when `appendToExisting` is false and the  `granularitySpec` contains an `interval`. WARNING: this 
+  functionality is still in beta and can result in temporary data unavailability for data within the specified `interval`
   
   The following examples demonstrate when to set the `dropExisting` property to true in the `ioConfig`:
   
@@ -220,7 +221,7 @@ that range if there's some stray data with unexpected timestamps.
 |type|The task type, this should always be `index_parallel`.|none|yes|
 |inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes|
 |appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This means that you can append new segments to any datasource regardless of its original partitioning scheme. You must use the `dynamic` partitioning type for the appended segments. If you specify a different partitioning type, the task fails with an error.|false|no|
-|dropExisting|If `true` and `appendToExisting` is `false` and the `granularitySpec` contains an`interval`, then the ingestion task drops (mark unused) all existing segments fully contained by the specified `interval` when the task publishes new segments. If ingestion fails, Druid does not drop or mark unused any segments. In the case of misconfiguration where either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec`, Druid does not drop any segments even if  [...]
+|dropExisting|If `true` and `appendToExisting` is `false` and the `granularitySpec` contains an`interval`, then the ingestion task drops (mark unused) all existing segments fully contained by the specified `interval` when the task publishes new segments. If ingestion fails, Druid does not drop or mark unused any segments. In the case of misconfiguration where either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec`, Druid does not drop any segments even if  [...]
 
 ### `tuningConfig`
 
@@ -749,7 +750,7 @@ that range if there's some stray data with unexpected timestamps.
 |type|The task type, this should always be "index".|none|yes|
 |inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes|
 |appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This means that you can append new segments to any datasource regardless of its original partitioning scheme. You must use the `dynamic` partitioning type for the appended segments. If you specify a different partitioning type, the task fails with an error.|false|no|
-|dropExisting|If `true` and `appendToExisting` is `false` and the `granularitySpec` contains an`interval`, then the ingestion task drops (mark unused) all existing segments fully contained by the specified `interval` when the task publishes new segments. If ingestion fails, Druid does not drop or mark unused any segments. In the case of misconfiguration where either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec`, Druid does not drop any segments even if  [...]
+|dropExisting|If `true` and `appendToExisting` is `false` and the `granularitySpec` contains an`interval`, then the ingestion task drops (mark unused) all existing segments fully contained by the specified `interval` when the task publishes new segments. If ingestion fails, Druid does not drop or mark unused any segments. In the case of misconfiguration where either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec`, Druid does not drop any segments even if  [...]
 
 ### `tuningConfig`
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java
index 60972de..faedd3d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.druid.segment.indexing.IOConfig;
 
+import javax.annotation.Nullable;
 import java.util.Objects;
 
 /**
@@ -36,11 +37,16 @@ import java.util.Objects;
 public class CompactionIOConfig implements IOConfig
 {
   private final CompactionInputSpec inputSpec;
+  private final boolean dropExisting;
 
   @JsonCreator
-  public CompactionIOConfig(@JsonProperty("inputSpec") CompactionInputSpec inputSpec)
+  public CompactionIOConfig(
+      @JsonProperty("inputSpec") CompactionInputSpec inputSpec,
+      @JsonProperty("dropExisting") @Nullable Boolean dropExisting
+  )
   {
     this.inputSpec = inputSpec;
+    this.dropExisting = dropExisting == null ? DEFAULT_DROP_EXISTING : dropExisting;
   }
 
   @JsonProperty
@@ -49,6 +55,12 @@ public class CompactionIOConfig implements IOConfig
     return inputSpec;
   }
 
+  @JsonProperty
+  public boolean isDropExisting()
+  {
+    return dropExisting;
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -59,13 +71,14 @@ public class CompactionIOConfig implements IOConfig
       return false;
     }
     CompactionIOConfig that = (CompactionIOConfig) o;
-    return Objects.equals(inputSpec, that.inputSpec);
+    return dropExisting == that.dropExisting &&
+           Objects.equals(inputSpec, that.inputSpec);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(inputSpec);
+    return Objects.hash(inputSpec, dropExisting);
   }
 
   @Override
@@ -73,6 +86,7 @@ public class CompactionIOConfig implements IOConfig
   {
     return "CompactionIOConfig{" +
            "inputSpec=" + inputSpec +
+           ", dropExisting=" + dropExisting +
            '}';
   }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 18f5f71..1961de1 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -203,11 +203,11 @@ public class CompactionTask extends AbstractBatchIndexTask
     if (ioConfig != null) {
       this.ioConfig = ioConfig;
     } else if (interval != null) {
-      this.ioConfig = new CompactionIOConfig(new CompactionIntervalSpec(interval, null));
+      this.ioConfig = new CompactionIOConfig(new CompactionIntervalSpec(interval, null), null);
     } else {
       // We already checked segments is not null or empty above.
       //noinspection ConstantConditions
-      this.ioConfig = new CompactionIOConfig(SpecificSegmentsSpec.fromSegments(segments));
+      this.ioConfig = new CompactionIOConfig(SpecificSegmentsSpec.fromSegments(segments), null);
     }
 
     this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec;
@@ -424,7 +424,8 @@ public class CompactionTask extends AbstractBatchIndexTask
         granularitySpec,
         toolbox.getCoordinatorClient(),
         segmentLoaderFactory,
-        retryPolicyFactory
+        retryPolicyFactory,
+        ioConfig.isDropExisting()
     );
     final List<ParallelIndexSupervisorTask> indexTaskSpecs = IntStream
         .range(0, ingestionSpecs.size())
@@ -532,7 +533,8 @@ public class CompactionTask extends AbstractBatchIndexTask
       @Nullable final ClientCompactionTaskGranularitySpec granularitySpec,
       final CoordinatorClient coordinatorClient,
       final SegmentLoaderFactory segmentLoaderFactory,
-      final RetryPolicyFactory retryPolicyFactory
+      final RetryPolicyFactory retryPolicyFactory,
+      final boolean dropExisting
   ) throws IOException, SegmentLoadingException
   {
     NonnullPair<Map<DataSegment, File>, List<TimelineObjectHolder<String, DataSegment>>> pair = prepareSegments(
@@ -614,7 +616,8 @@ public class CompactionTask extends AbstractBatchIndexTask
                     interval,
                     coordinatorClient,
                     segmentLoaderFactory,
-                    retryPolicyFactory
+                    retryPolicyFactory,
+                    dropExisting
                 ),
                 compactionTuningConfig
             )
@@ -641,7 +644,8 @@ public class CompactionTask extends AbstractBatchIndexTask
                   segmentProvider.interval,
                   coordinatorClient,
                   segmentLoaderFactory,
-                  retryPolicyFactory
+                  retryPolicyFactory,
+                  dropExisting
               ),
               compactionTuningConfig
           )
@@ -655,7 +659,8 @@ public class CompactionTask extends AbstractBatchIndexTask
       Interval interval,
       CoordinatorClient coordinatorClient,
       SegmentLoaderFactory segmentLoaderFactory,
-      RetryPolicyFactory retryPolicyFactory
+      RetryPolicyFactory retryPolicyFactory,
+      boolean dropExisting
   )
   {
     return new ParallelIndexIOConfig(
@@ -675,7 +680,7 @@ public class CompactionTask extends AbstractBatchIndexTask
         ),
         null,
         false,
-        true
+        dropExisting
     );
   }
 
@@ -1062,7 +1067,13 @@ public class CompactionTask extends AbstractBatchIndexTask
 
     public Builder inputSpec(CompactionInputSpec inputSpec)
     {
-      this.ioConfig = new CompactionIOConfig(inputSpec);
+      this.ioConfig = new CompactionIOConfig(inputSpec, null);
+      return this;
+    }
+
+    public Builder inputSpec(CompactionInputSpec inputSpec, Boolean dropExisting)
+    {
+      this.ioConfig = new CompactionIOConfig(inputSpec, dropExisting);
       return this;
     }
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 75ae4d2..3b759bc 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -1046,7 +1046,6 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
   public static class IndexIOConfig implements BatchIOConfig
   {
     private static final boolean DEFAULT_APPEND_TO_EXISTING = false;
-    private static final boolean DEFAULT_DROP_EXISTING = false;
 
     private final FirehoseFactory firehoseFactory;
     private final InputSource inputSource;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
index 6e93f3a..04b1b00 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
@@ -83,7 +83,8 @@ public class ClientCompactionTaskQuerySerdeTest
             new ClientCompactionIntervalSpec(
                 Intervals.of("2019/2020"),
                 "testSha256OfSortedSegmentIds"
-            )
+            ),
+            true
         ),
         new ClientCompactionTaskQueryTuningConfig(
             null,
@@ -201,6 +202,10 @@ public class ClientCompactionTaskQuerySerdeTest
         query.getGranularitySpec().getSegmentGranularity(),
         task.getGranularitySpec().getSegmentGranularity()
     );
+    Assert.assertEquals(
+        query.getIoConfig().isDropExisting(),
+        task.getIoConfig().isDropExisting()
+    );
     Assert.assertEquals(query.getContext(), task.getContext());
   }
 
@@ -214,7 +219,7 @@ public class ClientCompactionTaskQuerySerdeTest
         new RetryPolicyFactory(new RetryPolicyConfig())
     );
     final CompactionTask task = builder
-        .inputSpec(new CompactionIntervalSpec(Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds"))
+        .inputSpec(new CompactionIntervalSpec(Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds"), true)
         .tuningConfig(
             new ParallelIndexTuningConfig(
                 null,
@@ -269,7 +274,8 @@ public class ClientCompactionTaskQuerySerdeTest
             new ClientCompactionIntervalSpec(
                 Intervals.of("2019/2020"),
                 "testSha256OfSortedSegmentIds"
-            )
+            ),
+            true
         ),
         new ClientCompactionTaskQueryTuningConfig(
             100,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index 95826fa..95f7e3a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -443,7 +443,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
   }
 
   @Test
-  public void testCompactionDropSegmentsOfInputInterval()
+  public void testCompactionDropSegmentsOfInputIntervalIfDropFlagIsSet()
   {
     runIndexTask(null, true);
 
@@ -459,7 +459,8 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
         RETRY_POLICY_FACTORY
     );
     final CompactionTask compactionTask = builder
-        .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
+        // Set the dropExisting flag to true in the IOConfig of the compaction task
+        .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null), true)
         .tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
         .granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.MINUTE, null))
         .build();
@@ -475,6 +476,47 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
     }
   }
 
+  @Test
+  public void testCompactionDoesNotDropSegmentsIfDropFlagNotSet()
+  {
+    runIndexTask(null, true);
+
+    Collection<DataSegment> usedSegments = getCoordinatorClient().fetchUsedSegmentsInDataSourceForIntervals(DATA_SOURCE, ImmutableList.of(INTERVAL_TO_INDEX));
+    Assert.assertEquals(3, usedSegments.size());
+    for (DataSegment segment : usedSegments) {
+      Assert.assertTrue(Granularities.HOUR.isAligned(segment.getInterval()));
+    }
+
+    final Builder builder = new Builder(
+        DATA_SOURCE,
+        getSegmentLoaderFactory(),
+        RETRY_POLICY_FACTORY
+    );
+    final CompactionTask compactionTask = builder
+        .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
+        .tuningConfig(AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING)
+        .granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.MINUTE, null))
+        .build();
+
+    final Set<DataSegment> compactedSegments = runTask(compactionTask);
+
+    usedSegments = getCoordinatorClient().fetchUsedSegmentsInDataSourceForIntervals(DATA_SOURCE, ImmutableList.of(INTERVAL_TO_INDEX));
+    // All the HOUR segments did not get dropped since MINUTES segments did not fully covering the 3 HOURS interval.
+    Assert.assertEquals(6, usedSegments.size());
+    int hourSegmentCount = 0;
+    int minuteSegmentCount = 0;
+    for (DataSegment segment : usedSegments) {
+      if (Granularities.MINUTE.isAligned(segment.getInterval())) {
+        minuteSegmentCount++;
+      }
+      if (Granularities.MINUTE.isAligned(segment.getInterval())) {
+        hourSegmentCount++;
+      }
+    }
+    Assert.assertEquals(3, hourSegmentCount);
+    Assert.assertEquals(3, minuteSegmentCount);
+  }
+
   private void runIndexTask(@Nullable PartitionsSpec partitionsSpec, boolean appendToExisting)
   {
     ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index 384d26f..03acabd 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -803,7 +803,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
   }
 
   @Test
-  public void testPartialIntervalCompactWithFinerSegmentGranularityThenFullIntervalCompact() throws Exception
+  public void testPartialIntervalCompactWithFinerSegmentGranularityThenFullIntervalCompactWithDropExistingTrue() throws Exception
   {
     // This test fails with segment lock because of the bug reported in https://github.com/apache/druid/issues/10911.
     if (lockGranularity == LockGranularity.SEGMENT) {
@@ -842,8 +842,9 @@ public class CompactionTaskRunTest extends IngestionTestBase
     );
 
     final CompactionTask partialCompactionTask = builder
-        .interval(compactionPartialInterval)
         .segmentGranularity(Granularities.MINUTE)
+        // Set dropExisting to true
+        .inputSpec(new CompactionIntervalSpec(compactionPartialInterval, null), true)
         .build();
 
     final Pair<TaskStatus, List<DataSegment>> partialCompactionResult = runTask(partialCompactionTask);
@@ -865,8 +866,9 @@ public class CompactionTaskRunTest extends IngestionTestBase
     Assert.assertEquals(expectedSegments, segmentsAfterPartialCompaction);
 
     final CompactionTask fullCompactionTask = builder
-        .interval(Intervals.of("2014-01-01/2014-01-02"))
         .segmentGranularity(null)
+        // Set dropExisting to true
+        .inputSpec(new CompactionIntervalSpec(Intervals.of("2014-01-01/2014-01-02"), null), true)
         .build();
 
     final Pair<TaskStatus, List<DataSegment>> fullCompactionResult = runTask(fullCompactionTask);
@@ -904,6 +906,81 @@ public class CompactionTaskRunTest extends IngestionTestBase
   }
 
   @Test
+  public void testPartialIntervalCompactWithFinerSegmentGranularityThenFullIntervalCompactWithDropExistingFalse() throws Exception
+  {
+    // This test fails with segment lock because of the bug reported in https://github.com/apache/druid/issues/10911.
+    if (lockGranularity == LockGranularity.SEGMENT) {
+      return;
+    }
+
+    runIndexTask();
+
+    final Set<DataSegment> expectedSegments = new HashSet<>(
+        getStorageCoordinator().retrieveUsedSegmentsForIntervals(
+            DATA_SOURCE,
+            Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")),
+            Segments.ONLY_VISIBLE
+        )
+    );
+
+    final Builder builder = new Builder(
+        DATA_SOURCE,
+        segmentLoaderFactory,
+        RETRY_POLICY_FACTORY
+    );
+
+    final Interval partialInterval = Intervals.of("2014-01-01T01:00:00/2014-01-01T02:00:00");
+    final CompactionTask partialCompactionTask = builder
+        .segmentGranularity(Granularities.MINUTE)
+        // Set dropExisting to false
+        .inputSpec(new CompactionIntervalSpec(partialInterval, null), false)
+        .build();
+
+    final Pair<TaskStatus, List<DataSegment>> partialCompactionResult = runTask(partialCompactionTask);
+    Assert.assertTrue(partialCompactionResult.lhs.isSuccess());
+    // All segments in the previous expectedSegments should still appear as they have larger segment granularity.
+    expectedSegments.addAll(partialCompactionResult.rhs);
+
+    final Set<DataSegment> segmentsAfterPartialCompaction = new HashSet<>(
+        getStorageCoordinator().retrieveUsedSegmentsForIntervals(
+            DATA_SOURCE,
+            Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")),
+            Segments.ONLY_VISIBLE
+        )
+    );
+
+    Assert.assertEquals(expectedSegments, segmentsAfterPartialCompaction);
+
+    final CompactionTask fullCompactionTask = builder
+        .segmentGranularity(null)
+        // Set dropExisting to false
+        .inputSpec(new CompactionIntervalSpec(Intervals.of("2014-01-01/2014-01-02"), null), false)
+        .build();
+
+    final Pair<TaskStatus, List<DataSegment>> fullCompactionResult = runTask(fullCompactionTask);
+    Assert.assertTrue(fullCompactionResult.lhs.isSuccess());
+
+    final List<DataSegment> segmentsAfterFullCompaction = new ArrayList<>(
+        getStorageCoordinator().retrieveUsedSegmentsForIntervals(
+            DATA_SOURCE,
+            Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")),
+            Segments.ONLY_VISIBLE
+        )
+    );
+    segmentsAfterFullCompaction.sort(
+        (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(s1.getInterval(), s2.getInterval())
+    );
+
+    Assert.assertEquals(3, segmentsAfterFullCompaction.size());
+    for (int i = 0; i < segmentsAfterFullCompaction.size(); i++) {
+      Assert.assertEquals(
+          Intervals.of(StringUtils.format("2014-01-01T%02d/2014-01-01T%02d", i, i + 1)),
+          segmentsAfterFullCompaction.get(i).getInterval()
+      );
+    }
+  }
+
+  @Test
   public void testRunIndexAndCompactForSameSegmentAtTheSameTime() throws Exception
   {
     runIndexTask();
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index f9f3a66..c4faa5b 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -109,6 +109,7 @@ import org.apache.druid.segment.data.ListIndexed;
 import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.IOConfig;
 import org.apache.druid.segment.indexing.RealtimeTuningConfig;
 import org.apache.druid.segment.indexing.TuningConfig;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
@@ -848,7 +849,8 @@ public class CompactionTaskTest
         null,
         COORDINATOR_CLIENT,
         segmentLoaderFactory,
-        RETRY_POLICY_FACTORY
+        RETRY_POLICY_FACTORY,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
     final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
 
@@ -865,7 +867,8 @@ public class CompactionTaskTest
         AGGREGATORS,
         SEGMENT_INTERVALS,
         Granularities.MONTH,
-        Granularities.NONE
+        Granularities.NONE,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -919,7 +922,8 @@ public class CompactionTaskTest
         null,
         COORDINATOR_CLIENT,
         segmentLoaderFactory,
-        RETRY_POLICY_FACTORY
+        RETRY_POLICY_FACTORY,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
     final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
 
@@ -937,7 +941,8 @@ public class CompactionTaskTest
         SEGMENT_INTERVALS,
         tuningConfig,
         Granularities.MONTH,
-        Granularities.NONE
+        Granularities.NONE,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -991,7 +996,8 @@ public class CompactionTaskTest
         null,
         COORDINATOR_CLIENT,
         segmentLoaderFactory,
-        RETRY_POLICY_FACTORY
+        RETRY_POLICY_FACTORY,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
     final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
 
@@ -1009,7 +1015,8 @@ public class CompactionTaskTest
         SEGMENT_INTERVALS,
         tuningConfig,
         Granularities.MONTH,
-        Granularities.NONE
+        Granularities.NONE,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1063,7 +1070,8 @@ public class CompactionTaskTest
         null,
         COORDINATOR_CLIENT,
         segmentLoaderFactory,
-        RETRY_POLICY_FACTORY
+        RETRY_POLICY_FACTORY,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
     final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
 
@@ -1081,7 +1089,8 @@ public class CompactionTaskTest
         SEGMENT_INTERVALS,
         tuningConfig,
         Granularities.MONTH,
-        Granularities.NONE
+        Granularities.NONE,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1125,7 +1134,8 @@ public class CompactionTaskTest
         null,
         COORDINATOR_CLIENT,
         segmentLoaderFactory,
-        RETRY_POLICY_FACTORY
+        RETRY_POLICY_FACTORY,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
 
     ingestionSpecs.sort(
@@ -1143,7 +1153,8 @@ public class CompactionTaskTest
         AGGREGATORS,
         SEGMENT_INTERVALS,
         Granularities.MONTH,
-        Granularities.NONE
+        Granularities.NONE,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1167,7 +1178,8 @@ public class CompactionTaskTest
         null,
         COORDINATOR_CLIENT,
         segmentLoaderFactory,
-        RETRY_POLICY_FACTORY
+        RETRY_POLICY_FACTORY,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
 
     final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
@@ -1185,7 +1197,8 @@ public class CompactionTaskTest
         Arrays.asList(customMetricsSpec),
         SEGMENT_INTERVALS,
         Granularities.MONTH,
-        Granularities.NONE
+        Granularities.NONE,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1202,7 +1215,8 @@ public class CompactionTaskTest
         null,
         COORDINATOR_CLIENT,
         segmentLoaderFactory,
-        RETRY_POLICY_FACTORY
+        RETRY_POLICY_FACTORY,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
     final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
 
@@ -1219,7 +1233,8 @@ public class CompactionTaskTest
         AGGREGATORS,
         SEGMENT_INTERVALS,
         Granularities.MONTH,
-        Granularities.NONE
+        Granularities.NONE,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1243,7 +1258,8 @@ public class CompactionTaskTest
         null,
         COORDINATOR_CLIENT,
         segmentLoaderFactory,
-        RETRY_POLICY_FACTORY
+        RETRY_POLICY_FACTORY,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1266,7 +1282,8 @@ public class CompactionTaskTest
         null,
         COORDINATOR_CLIENT,
         segmentLoaderFactory,
-        RETRY_POLICY_FACTORY
+        RETRY_POLICY_FACTORY,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1300,7 +1317,8 @@ public class CompactionTaskTest
         new ClientCompactionTaskGranularitySpec(new PeriodGranularity(Period.months(3), null, null), null),
         COORDINATOR_CLIENT,
         segmentLoaderFactory,
-        RETRY_POLICY_FACTORY
+        RETRY_POLICY_FACTORY,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
     final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of(
         new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double")))
@@ -1319,7 +1337,8 @@ public class CompactionTaskTest
         AGGREGATORS,
         Collections.singletonList(COMPACTION_INTERVAL),
         new PeriodGranularity(Period.months(3), null, null),
-        Granularities.NONE
+        Granularities.NONE,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1336,7 +1355,8 @@ public class CompactionTaskTest
         new ClientCompactionTaskGranularitySpec(null, new PeriodGranularity(Period.months(3), null, null)),
         COORDINATOR_CLIENT,
         segmentLoaderFactory,
-        RETRY_POLICY_FACTORY
+        RETRY_POLICY_FACTORY,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
     final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
 
@@ -1353,7 +1373,8 @@ public class CompactionTaskTest
         AGGREGATORS,
         SEGMENT_INTERVALS,
         Granularities.MONTH,
-        new PeriodGranularity(Period.months(3), null, null)
+        new PeriodGranularity(Period.months(3), null, null),
+        IOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1373,7 +1394,8 @@ public class CompactionTaskTest
         ),
         COORDINATOR_CLIENT,
         segmentLoaderFactory,
-        RETRY_POLICY_FACTORY
+        RETRY_POLICY_FACTORY,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
     final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of(
         new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double")))
@@ -1392,7 +1414,8 @@ public class CompactionTaskTest
         AGGREGATORS,
         Collections.singletonList(COMPACTION_INTERVAL),
         new PeriodGranularity(Period.months(3), null, null),
-        new PeriodGranularity(Period.months(3), null, null)
+        new PeriodGranularity(Period.months(3), null, null),
+        IOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1409,7 +1432,8 @@ public class CompactionTaskTest
         null,
         COORDINATOR_CLIENT,
         segmentLoaderFactory,
-        RETRY_POLICY_FACTORY
+        RETRY_POLICY_FACTORY,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
     final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
 
@@ -1426,7 +1450,8 @@ public class CompactionTaskTest
         AGGREGATORS,
         SEGMENT_INTERVALS,
         Granularities.MONTH,
-        Granularities.NONE
+        Granularities.NONE,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1444,7 +1469,8 @@ public class CompactionTaskTest
         new ClientCompactionTaskGranularitySpec(null, null),
         COORDINATOR_CLIENT,
         segmentLoaderFactory,
-        RETRY_POLICY_FACTORY
+        RETRY_POLICY_FACTORY,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
     final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
 
@@ -1461,7 +1487,8 @@ public class CompactionTaskTest
         AGGREGATORS,
         SEGMENT_INTERVALS,
         Granularities.MONTH,
-        Granularities.NONE
+        Granularities.NONE,
+        IOConfig.DEFAULT_DROP_EXISTING
     );
   }
 
@@ -1569,7 +1596,8 @@ public class CompactionTaskTest
       List<AggregatorFactory> expectedMetricsSpec,
       List<Interval> expectedSegmentIntervals,
       Granularity expectedSegmentGranularity,
-      Granularity expectedQueryGranularity
+      Granularity expectedQueryGranularity,
+      boolean expectedDropExisting
   )
   {
     assertIngestionSchema(
@@ -1615,7 +1643,8 @@ public class CompactionTaskTest
             null
         ),
         expectedSegmentGranularity,
-        expectedQueryGranularity
+        expectedQueryGranularity,
+        expectedDropExisting
     );
   }
 
@@ -1626,7 +1655,8 @@ public class CompactionTaskTest
       List<Interval> expectedSegmentIntervals,
       CompactionTask.CompactionTuningConfig expectedTuningConfig,
       Granularity expectedSegmentGranularity,
-      Granularity expectedQueryGranularity
+      Granularity expectedQueryGranularity,
+      boolean expectedDropExisting
   )
   {
     Preconditions.checkArgument(
@@ -1673,6 +1703,7 @@ public class CompactionTaskTest
       // assert ioConfig
       final ParallelIndexIOConfig ioConfig = ingestionSchema.getIOConfig();
       Assert.assertFalse(ioConfig.isAppendToExisting());
+      Assert.assertEquals(expectedDropExisting, ioConfig.isDropExisting());
       final InputSource inputSource = ioConfig.getInputSource();
       Assert.assertTrue(inputSource instanceof DruidInputSource);
       final DruidInputSource druidInputSource = (DruidInputSource) inputSource;
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
index 990d573..05593c4 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
@@ -35,6 +35,7 @@ import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
 import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
 import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
 import org.apache.druid.testing.IntegrationTestingConfig;
 import org.apache.druid.testing.clients.CompactionResourceTestClient;
@@ -168,7 +169,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
       LOG.info("Auto compaction test with hash partitioning");
 
       final HashedPartitionsSpec hashedPartitionsSpec = new HashedPartitionsSpec(null, 3, null);
-      submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1, null);
+      submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1, null, false);
       // 2 segments published per day after compaction.
       forceTriggerAutoCompaction(4);
       verifyQuery(INDEX_QUERIES_RESOURCE);
@@ -183,7 +184,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
           "city",
           false
       );
-      submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1, null);
+      submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1, null, false);
       forceTriggerAutoCompaction(2);
       verifyQuery(INDEX_QUERIES_RESOURCE);
       verifySegmentsCompacted(rangePartitionsSpec, 2);
@@ -287,7 +288,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
   }
 
   @Test
-  public void testAutoCompactionDutyWithSegmentGranularity() throws Exception
+  public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingTrue() throws Exception
   {
     loadData(INDEX_TASK);
     try (final Closeable ignored = unloader(fullDatasourceName)) {
@@ -298,7 +299,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
       verifyQuery(INDEX_QUERIES_RESOURCE);
 
       Granularity newGranularity = Granularities.YEAR;
-      submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null));
+      // Set dropExisting to true
+      submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), true);
 
       LOG.info("Auto compaction test with YEAR segment granularity");
 
@@ -314,10 +316,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
       checkCompactionIntervals(expectedIntervalAfterCompaction);
 
       newGranularity = Granularities.DAY;
-      submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null));
+      // Set dropExisting to true
+      submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), true);
 
       LOG.info("Auto compaction test with DAY segment granularity");
 
+      // Since dropExisting is set to true...
       // The earlier segment with YEAR granularity will be dropped post-compaction
       // Hence, we will only have 2013-08-31 to 2013-09-01 and 2013-09-01 to 2013-09-02.
       expectedIntervalAfterCompaction = new ArrayList<>();
@@ -334,6 +338,58 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
   }
 
   @Test
+  public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingFalse() throws Exception
+  {
+    loadData(INDEX_TASK);
+    try (final Closeable ignored = unloader(fullDatasourceName)) {
+      final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
+      intervalsBeforeCompaction.sort(null);
+      // 4 segments across 2 days (4 total)...
+      verifySegmentsCount(4);
+      verifyQuery(INDEX_QUERIES_RESOURCE);
+
+      Granularity newGranularity = Granularities.YEAR;
+      // Set dropExisting to false
+      submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), false);
+
+      LOG.info("Auto compaction test with YEAR segment granularity");
+
+      List<String> expectedIntervalAfterCompaction = new ArrayList<>();
+      for (String interval : intervalsBeforeCompaction) {
+        for (Interval newinterval : newGranularity.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) {
+          expectedIntervalAfterCompaction.add(newinterval.toString());
+        }
+      }
+      forceTriggerAutoCompaction(1);
+      verifyQuery(INDEX_QUERIES_RESOURCE);
+      verifySegmentsCompacted(1, 1000);
+      checkCompactionIntervals(expectedIntervalAfterCompaction);
+
+      newGranularity = Granularities.DAY;
+      // Set dropExisting to false
+      submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), false);
+
+      LOG.info("Auto compaction test with DAY segment granularity");
+
+      // Since dropExisting is set to false...
+      // The earlier segment with YEAR granularity is still 'used' as it’s not fully overshaowed.
+      // This is because we only have newer version on 2013-08-31 to 2013-09-01 and 2013-09-01 to 2013-09-02.
+      // The version for the YEAR segment is still the latest for 2013-01-01 to 2013-08-31 and 2013-09-02 to 2014-01-01.
+      // Hence, all three segments are available and the expected intervals are combined from the DAY and YEAR segment granularities
+      // (which are 2013-08-31 to 2013-09-01, 2013-09-01 to 2013-09-02 and 2013-01-01 to 2014-01-01)
+      for (String interval : intervalsBeforeCompaction) {
+        for (Interval newinterval : newGranularity.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) {
+          expectedIntervalAfterCompaction.add(newinterval.toString());
+        }
+      }
+      forceTriggerAutoCompaction(3);
+      verifyQuery(INDEX_QUERIES_RESOURCE);
+      verifySegmentsCompacted(3, 1000);
+      checkCompactionIntervals(expectedIntervalAfterCompaction);
+    }
+  }
+
+  @Test
   public void testAutoCompactionDutyWithSegmentGranularityAndMixedVersion() throws Exception
   {
     loadData(INDEX_TASK);
@@ -437,7 +493,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
   }
 
   @Test
-  public void testAutoCompactionDutyWithSegmentGranularityAndSmallerSegmentGranularityCoveringMultipleSegmentsInTimeline() throws Exception
+  public void testAutoCompactionDutyWithSegmentGranularityAndSmallerSegmentGranularityCoveringMultipleSegmentsInTimelineAndDropExistingTrue() throws Exception
   {
     loadData(INDEX_TASK);
     try (final Closeable ignored = unloader(fullDatasourceName)) {
@@ -448,7 +504,8 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
       verifyQuery(INDEX_QUERIES_RESOURCE);
 
       Granularity newGranularity = Granularities.YEAR;
-      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null));
+      // Set dropExisting to true
+      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), true);
 
       List<String> expectedIntervalAfterCompaction = new ArrayList<>();
       // We wil have one segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR)
@@ -473,7 +530,9 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
       checkCompactionIntervals(expectedIntervalAfterCompaction);
 
       newGranularity = Granularities.MONTH;
-      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null));
+      // Set dropExisting to true
+      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), true);
+      // Since dropExisting is set to true...
       // This will submit a single compaction task for interval of 2013-01-01/2014-01-01 with MONTH granularity
       expectedIntervalAfterCompaction = new ArrayList<>();
       // The previous segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR) will be dropped
@@ -491,6 +550,71 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
     }
   }
 
+  @Test
+  public void testAutoCompactionDutyWithSegmentGranularityAndSmallerSegmentGranularityCoveringMultipleSegmentsInTimelineAndDropExistingFalse() throws Exception
+  {
+    loadData(INDEX_TASK);
+    try (final Closeable ignored = unloader(fullDatasourceName)) {
+      final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
+      intervalsBeforeCompaction.sort(null);
+      // 4 segments across 2 days (4 total)...
+      verifySegmentsCount(4);
+      verifyQuery(INDEX_QUERIES_RESOURCE);
+
+      Granularity newGranularity = Granularities.YEAR;
+      // Set dropExisting to false
+      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), false);
+
+      List<String> expectedIntervalAfterCompaction = new ArrayList<>();
+      // We wil have one segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR)
+      for (String interval : intervalsBeforeCompaction) {
+        for (Interval newinterval : newGranularity.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) {
+          expectedIntervalAfterCompaction.add(newinterval.toString());
+        }
+      }
+      forceTriggerAutoCompaction(1);
+      verifyQuery(INDEX_QUERIES_RESOURCE);
+      verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
+      checkCompactionIntervals(expectedIntervalAfterCompaction);
+
+      loadData(INDEX_TASK);
+      verifySegmentsCount(5);
+      verifyQuery(INDEX_QUERIES_RESOURCE);
+      // 5 segments. 1 compacted YEAR segment and 4 newly ingested DAY segments across 2 days
+      // We wil have one segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR) from the compaction earlier
+      // two segments with interval of 2013-08-31/2013-09-01 (newly ingested with DAY)
+      // and two segments with interval of 2013-09-01/2013-09-02 (newly ingested with DAY)
+      expectedIntervalAfterCompaction.addAll(intervalsBeforeCompaction);
+      checkCompactionIntervals(expectedIntervalAfterCompaction);
+
+      newGranularity = Granularities.MONTH;
+      // Set dropExisting to false
+      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null), false);
+      // Since dropExisting is set to true...
+      // This will submit a single compaction task for interval of 2013-01-01/2014-01-01 with MONTH granularity
+      expectedIntervalAfterCompaction = new ArrayList<>();
+      // Since dropExisting is set to false...
+      // We wil have one segment with interval of 2013-01-01/2014-01-01 (compacted with YEAR) from before the compaction
+      for (String interval : intervalsBeforeCompaction) {
+        for (Interval newinterval : Granularities.YEAR.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) {
+          expectedIntervalAfterCompaction.add(newinterval.toString());
+        }
+      }
+      // one segments with interval of 2013-09-01/2013-10-01 (compacted with MONTH)
+      // and one segments with interval of 2013-10-01/2013-11-01 (compacted with MONTH)
+      for (String interval : intervalsBeforeCompaction) {
+        for (Interval newinterval : Granularities.MONTH.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) {
+          expectedIntervalAfterCompaction.add(newinterval.toString());
+        }
+      }
+
+      forceTriggerAutoCompaction(3);
+      verifyQuery(INDEX_QUERIES_RESOURCE);
+      verifySegmentsCompacted(3, MAX_ROWS_PER_SEGMENT_COMPACTED);
+      checkCompactionIntervals(expectedIntervalAfterCompaction);
+    }
+  }
+
   private void loadData(String indexTask) throws Exception
   {
     String taskSpec = getResourceAsString(indexTask);
@@ -537,14 +661,20 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
 
   private void submitCompactionConfig(Integer maxRowsPerSegment, Period skipOffsetFromLatest, UserCompactionTaskGranularityConfig granularitySpec) throws Exception
   {
-    submitCompactionConfig(new DynamicPartitionsSpec(maxRowsPerSegment, null), skipOffsetFromLatest, 1, granularitySpec);
+    submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, granularitySpec, false);
+  }
+
+  private void submitCompactionConfig(Integer maxRowsPerSegment, Period skipOffsetFromLatest, UserCompactionTaskGranularityConfig granularitySpec, boolean dropExisting) throws Exception
+  {
+    submitCompactionConfig(new DynamicPartitionsSpec(maxRowsPerSegment, null), skipOffsetFromLatest, 1, granularitySpec, dropExisting);
   }
 
   private void submitCompactionConfig(
       PartitionsSpec partitionsSpec,
       Period skipOffsetFromLatest,
       int maxNumConcurrentSubTasks,
-      UserCompactionTaskGranularityConfig granularitySpec
+      UserCompactionTaskGranularityConfig granularitySpec,
+      boolean dropExisting
   ) throws Exception
   {
     DataSourceCompactionConfig compactionConfig = new DataSourceCompactionConfig(
@@ -573,6 +703,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
             1
         ),
         granularitySpec,
+        !dropExisting ? null : new UserCompactionTaskIOConfig(true),
         null
     );
     compactionResource.submitCompactionConfig(compactionConfig);
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java
index eec0954..0419bc2 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionIOConfig.java
@@ -21,7 +21,9 @@ package org.apache.druid.client.indexing;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.segment.indexing.IOConfig;
 
+import javax.annotation.Nullable;
 import java.util.Objects;
 
 /**
@@ -34,11 +36,16 @@ public class ClientCompactionIOConfig
   private static final String TYPE = "compact";
 
   private final ClientCompactionIntervalSpec inputSpec;
+  private final boolean dropExisting;
 
   @JsonCreator
-  public ClientCompactionIOConfig(@JsonProperty("inputSpec") ClientCompactionIntervalSpec inputSpec)
+  public ClientCompactionIOConfig(
+      @JsonProperty("inputSpec") ClientCompactionIntervalSpec inputSpec,
+      @JsonProperty("dropExisting") @Nullable Boolean dropExisting
+  )
   {
     this.inputSpec = inputSpec;
+    this.dropExisting = dropExisting == null ? IOConfig.DEFAULT_DROP_EXISTING : dropExisting;
   }
 
   @JsonProperty
@@ -53,6 +60,12 @@ public class ClientCompactionIOConfig
     return inputSpec;
   }
 
+  @JsonProperty
+  public boolean isDropExisting()
+  {
+    return dropExisting;
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -63,13 +76,14 @@ public class ClientCompactionIOConfig
       return false;
     }
     ClientCompactionIOConfig that = (ClientCompactionIOConfig) o;
-    return Objects.equals(inputSpec, that.inputSpec);
+    return dropExisting == that.dropExisting &&
+           Objects.equals(inputSpec, that.inputSpec);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(inputSpec);
+    return Objects.hash(inputSpec, dropExisting);
   }
 
   @Override
@@ -77,6 +91,7 @@ public class ClientCompactionIOConfig
   {
     return "ClientCompactionIOConfig{" +
            "inputSpec=" + inputSpec +
+           ", dropExisting=" + dropExisting +
            '}';
   }
 }
diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
index bc9f9c5..d8f8f35 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
@@ -80,6 +80,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
       int compactionTaskPriority,
       @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
       @Nullable ClientCompactionTaskGranularitySpec granularitySpec,
+      @Nullable Boolean dropExisting,
       @Nullable Map<String, Object> context
   )
   {
@@ -98,7 +99,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
     final ClientTaskQuery taskQuery = new ClientCompactionTaskQuery(
         taskId,
         dataSource,
-        new ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments)),
+        new ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments), dropExisting),
         tuningConfig,
         granularitySpec,
         context
diff --git a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
index f7c9712..14dfcfe 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
@@ -41,6 +41,7 @@ public interface IndexingServiceClient
       int compactionTaskPriority,
       @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
       @Nullable ClientCompactionTaskGranularitySpec granularitySpec,
+      @Nullable Boolean dropExisting,
       @Nullable Map<String, Object> context
   );
 
diff --git a/server/src/main/java/org/apache/druid/segment/indexing/IOConfig.java b/server/src/main/java/org/apache/druid/segment/indexing/IOConfig.java
index b178480..58f84c2 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/IOConfig.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/IOConfig.java
@@ -30,4 +30,5 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
 })
 public interface IOConfig
 {
+  boolean DEFAULT_DROP_EXISTING = false;
 }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
index d64eb08..e7a4240 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
@@ -47,6 +47,7 @@ public class DataSourceCompactionConfig
   private final Period skipOffsetFromLatest;
   private final UserCompactionTaskQueryTuningConfig tuningConfig;
   private final UserCompactionTaskGranularityConfig granularitySpec;
+  private final UserCompactionTaskIOConfig ioConfig;
   private final Map<String, Object> taskContext;
 
   @JsonCreator
@@ -58,6 +59,7 @@ public class DataSourceCompactionConfig
       @JsonProperty("skipOffsetFromLatest") @Nullable Period skipOffsetFromLatest,
       @JsonProperty("tuningConfig") @Nullable UserCompactionTaskQueryTuningConfig tuningConfig,
       @JsonProperty("granularitySpec") @Nullable UserCompactionTaskGranularityConfig granularitySpec,
+      @JsonProperty("ioConfig") @Nullable UserCompactionTaskIOConfig ioConfig,
       @JsonProperty("taskContext") @Nullable Map<String, Object> taskContext
   )
   {
@@ -71,6 +73,7 @@ public class DataSourceCompactionConfig
     this.maxRowsPerSegment = maxRowsPerSegment;
     this.skipOffsetFromLatest = skipOffsetFromLatest == null ? DEFAULT_SKIP_OFFSET_FROM_LATEST : skipOffsetFromLatest;
     this.tuningConfig = tuningConfig;
+    this.ioConfig = ioConfig;
     if (granularitySpec != null) {
       Preconditions.checkArgument(
           granularitySpec.getQueryGranularity() == null,
@@ -121,6 +124,13 @@ public class DataSourceCompactionConfig
 
   @JsonProperty
   @Nullable
+  public UserCompactionTaskIOConfig getIoConfig()
+  {
+    return ioConfig;
+  }
+
+  @JsonProperty
+  @Nullable
   public UserCompactionTaskGranularityConfig getGranularitySpec()
   {
     return granularitySpec;
@@ -150,6 +160,7 @@ public class DataSourceCompactionConfig
            Objects.equals(skipOffsetFromLatest, that.skipOffsetFromLatest) &&
            Objects.equals(tuningConfig, that.tuningConfig) &&
            Objects.equals(granularitySpec, that.granularitySpec) &&
+           Objects.equals(ioConfig, that.ioConfig) &&
            Objects.equals(taskContext, that.taskContext);
   }
 
@@ -164,6 +175,7 @@ public class DataSourceCompactionConfig
         skipOffsetFromLatest,
         tuningConfig,
         granularitySpec,
+        ioConfig,
         taskContext
     );
   }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskIOConfig.java
similarity index 54%
copy from indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java
copy to server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskIOConfig.java
index 60972de..df5af28 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionIOConfig.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskIOConfig.java
@@ -17,36 +17,39 @@
  * under the License.
  */
 
-package org.apache.druid.indexing.common.task;
+package org.apache.druid.server.coordinator;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.druid.segment.indexing.IOConfig;
 
+import javax.annotation.Nullable;
 import java.util.Objects;
 
 /**
- * {@link IOConfig} for {@link CompactionTask}. Should be synchronized with {@link
- * org.apache.druid.client.indexing.ClientCompactionIOConfig}.
- *
- * @see CompactionInputSpec
+ * Spec containing IO configs for Auto Compaction.
+ * This class mimics JSON field names for fields supported in auto compaction with
+ * the corresponding fields in {@link IOConfig}.
+ * This is done for end-user ease of use. Basically, end-user will use the same syntax / JSON structure to set
+ * IO configs for Auto Compaction as they would for any other ingestion task.
+ * Note that this class simply holds IO configs and pass it to compaction task spec.
  */
-@JsonTypeName("compact")
-public class CompactionIOConfig implements IOConfig
+public class UserCompactionTaskIOConfig
 {
-  private final CompactionInputSpec inputSpec;
+  private final boolean dropExisting;
 
   @JsonCreator
-  public CompactionIOConfig(@JsonProperty("inputSpec") CompactionInputSpec inputSpec)
+  public UserCompactionTaskIOConfig(
+      @JsonProperty("dropExisting") @Nullable Boolean dropExisting
+  )
   {
-    this.inputSpec = inputSpec;
+    this.dropExisting = dropExisting == null ? IOConfig.DEFAULT_DROP_EXISTING : dropExisting;
   }
 
   @JsonProperty
-  public CompactionInputSpec getInputSpec()
+  public boolean isDropExisting()
   {
-    return inputSpec;
+    return dropExisting;
   }
 
   @Override
@@ -58,21 +61,21 @@ public class CompactionIOConfig implements IOConfig
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-    CompactionIOConfig that = (CompactionIOConfig) o;
-    return Objects.equals(inputSpec, that.inputSpec);
+    UserCompactionTaskIOConfig that = (UserCompactionTaskIOConfig) o;
+    return dropExisting == that.dropExisting;
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(inputSpec);
+    return Objects.hash(dropExisting);
   }
 
   @Override
   public String toString()
   {
-    return "CompactionIOConfig{" +
-           "inputSpec=" + inputSpec +
+    return "UserCompactionTaskIOConfig{" +
+           "dropExisting=" + dropExisting +
            '}';
   }
 }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index b047758..5d31751 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -320,6 +320,11 @@ public class CompactSegments implements CoordinatorDuty
           queryGranularitySpec = null;
         }
 
+        Boolean dropExisting = null;
+        if (config.getIoConfig() != null) {
+          dropExisting = config.getIoConfig().isDropExisting();
+        }
+
         // make tuningConfig
         final String taskId = indexingServiceClient.compactSegments(
             "coordinator-issued",
@@ -327,6 +332,7 @@ public class CompactSegments implements CoordinatorDuty
             config.getTaskPriority(),
             ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()),
             queryGranularitySpec,
+            dropExisting,
             newAutoCompactionContext(config.getTaskContext())
         );
 
diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
index c5a85fc..600b4be 100644
--- a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
+++ b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
@@ -51,6 +51,7 @@ public class NoopIndexingServiceClient implements IndexingServiceClient
       int compactionTaskPriority,
       @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
       @Nullable ClientCompactionTaskGranularitySpec granularitySpec,
+      @Nullable Boolean dropExisting,
       @Nullable Map<String, Object> context
   )
   {
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
index efe04e3..c798d29 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
@@ -58,6 +58,7 @@ public class DataSourceCompactionConfigTest
         new Period(3600),
         null,
         null,
+        null,
         ImmutableMap.of("key", "val")
     );
     final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -84,6 +85,7 @@ public class DataSourceCompactionConfigTest
         new Period(3600),
         null,
         null,
+        null,
         ImmutableMap.of("key", "val")
     );
     final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -127,6 +129,7 @@ public class DataSourceCompactionConfigTest
             null
         ),
         null,
+        null,
         ImmutableMap.of("key", "val")
     );
     final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -170,6 +173,7 @@ public class DataSourceCompactionConfigTest
             null
         ),
         null,
+        null,
         ImmutableMap.of("key", "val")
     );
 
@@ -235,6 +239,7 @@ public class DataSourceCompactionConfigTest
         new Period(3600),
         null,
         new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
+        null,
         ImmutableMap.of("key", "val")
     );
     final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -261,6 +266,7 @@ public class DataSourceCompactionConfigTest
         new Period(3600),
         null,
         new UserCompactionTaskGranularityConfig(Granularities.HOUR, Granularities.MONTH),
+        null,
         ImmutableMap.of("key", "val")
     );
   }
@@ -276,6 +282,7 @@ public class DataSourceCompactionConfigTest
         new Period(3600),
         null,
         null,
+        null,
         ImmutableMap.of("key", "val")
     );
     final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -302,6 +309,62 @@ public class DataSourceCompactionConfigTest
         new Period(3600),
         null,
         new UserCompactionTaskGranularityConfig(null, null),
+        null,
+        ImmutableMap.of("key", "val")
+    );
+    final String json = OBJECT_MAPPER.writeValueAsString(config);
+    final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class);
+
+    Assert.assertEquals(config.getDataSource(), fromJson.getDataSource());
+    Assert.assertEquals(25, fromJson.getTaskPriority());
+    Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
+    Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
+    Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
+    Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
+    Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
+    Assert.assertEquals(config.getGranularitySpec(), fromJson.getGranularitySpec());
+  }
+
+  @Test
+  public void testSerdeIOConfigWithNonNullDropExisting() throws IOException
+  {
+    final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
+        "dataSource",
+        null,
+        500L,
+        null,
+        new Period(3600),
+        null,
+        new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
+        new UserCompactionTaskIOConfig(true),
+        ImmutableMap.of("key", "val")
+    );
+    final String json = OBJECT_MAPPER.writeValueAsString(config);
+    final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class);
+
+    Assert.assertEquals(config.getDataSource(), fromJson.getDataSource());
+    Assert.assertEquals(25, fromJson.getTaskPriority());
+    Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
+    Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
+    Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
+    Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
+    Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
+    Assert.assertEquals(config.getGranularitySpec(), fromJson.getGranularitySpec());
+    Assert.assertEquals(config.getIoConfig(), fromJson.getIoConfig());
+  }
+
+  @Test
+  public void testSerdeIOConfigWithNullDropExisting() throws IOException
+  {
+    final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
+        "dataSource",
+        null,
+        500L,
+        null,
+        new Period(3600),
+        null,
+        new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
+        new UserCompactionTaskIOConfig(null),
         ImmutableMap.of("key", "val")
     );
     final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -315,5 +378,6 @@ public class DataSourceCompactionConfigTest
     Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
     Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
     Assert.assertEquals(config.getGranularitySpec(), fromJson.getGranularitySpec());
+    Assert.assertEquals(config.getIoConfig(), fromJson.getIoConfig());
   }
 }
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index 4bec733..90b3b5d 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -66,6 +66,7 @@ import org.apache.druid.server.coordinator.CoordinatorStats;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
 import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
 import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
 import org.apache.druid.timeline.CompactionState;
 import org.apache.druid.timeline.DataSegment;
@@ -592,6 +593,7 @@ public class CompactSegmentsTest
                 null
             ),
             null,
+            null,
             null
         )
     );
@@ -605,6 +607,7 @@ public class CompactSegmentsTest
         ArgumentMatchers.anyInt(),
         ArgumentMatchers.any(),
         granularitySpecArgumentCaptor.capture(),
+        ArgumentMatchers.any(),
         ArgumentMatchers.any()
     );
     // Only the same amount of segments as the original PARTITION_PER_TIME_INTERVAL since segment granulartity is the same
@@ -613,6 +616,110 @@ public class CompactSegmentsTest
   }
 
   @Test
+  public void testCompactWithNotNullIOConfig()
+  {
+    final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
+    final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, mockIndexingServiceClient);
+    final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
+    final String dataSource = DATA_SOURCE_PREFIX + 0;
+    compactionConfigs.add(
+        new DataSourceCompactionConfig(
+            dataSource,
+            0,
+            500L,
+            null,
+            new Period("PT0H"), // smaller than segment interval
+            new UserCompactionTaskQueryTuningConfig(
+                null,
+                null,
+                null,
+                null,
+                partitionsSpec,
+                null,
+                null,
+                null,
+                null,
+                null,
+                3,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null
+            ),
+            null,
+            new UserCompactionTaskIOConfig(true),
+            null
+        )
+    );
+    doCompactSegments(compactSegments, compactionConfigs);
+    ArgumentCaptor<Boolean> dropExistingCapture = ArgumentCaptor.forClass(Boolean.class);
+    Mockito.verify(mockIndexingServiceClient).compactSegments(
+        ArgumentMatchers.anyString(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.anyInt(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any(),
+        dropExistingCapture.capture(),
+        ArgumentMatchers.any()
+    );
+    Assert.assertEquals(true, dropExistingCapture.getValue());
+  }
+
+  @Test
+  public void testCompactWithNullIOConfig()
+  {
+    final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
+    final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, mockIndexingServiceClient);
+    final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
+    final String dataSource = DATA_SOURCE_PREFIX + 0;
+    compactionConfigs.add(
+        new DataSourceCompactionConfig(
+            dataSource,
+            0,
+            500L,
+            null,
+            new Period("PT0H"), // smaller than segment interval
+            new UserCompactionTaskQueryTuningConfig(
+                null,
+                null,
+                null,
+                null,
+                partitionsSpec,
+                null,
+                null,
+                null,
+                null,
+                null,
+                3,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null
+            ),
+            null,
+            null,
+            null
+        )
+    );
+    doCompactSegments(compactSegments, compactionConfigs);
+    ArgumentCaptor<Boolean> dropExistingCapture = ArgumentCaptor.forClass(Boolean.class);
+    Mockito.verify(mockIndexingServiceClient).compactSegments(
+        ArgumentMatchers.anyString(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.anyInt(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any(),
+        dropExistingCapture.capture(),
+        ArgumentMatchers.any()
+    );
+    Assert.assertNull(dropExistingCapture.getValue());
+  }
+
+  @Test
   public void testCompactWithGranularitySpec()
   {
     final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
@@ -646,6 +753,7 @@ public class CompactSegmentsTest
                 null
             ),
             new UserCompactionTaskGranularityConfig(Granularities.YEAR, null),
+            null,
             null
         )
     );
@@ -659,6 +767,7 @@ public class CompactSegmentsTest
         ArgumentMatchers.anyInt(),
         ArgumentMatchers.any(),
         granularitySpecArgumentCaptor.capture(),
+        ArgumentMatchers.any(),
         ArgumentMatchers.any()
     );
     // All segments is compact at the same time since we changed the segment granularity to YEAR and all segment
@@ -698,7 +807,8 @@ public class CompactSegmentsTest
                 new ClientCompactionIntervalSpec(
                     Intervals.of("2000/2099"),
                     "testSha256OfSortedSegmentIds"
-                )
+                ),
+                null
             ),
             null,
             new ClientCompactionTaskGranularitySpec(Granularities.DAY, null),
@@ -737,6 +847,7 @@ public class CompactSegmentsTest
                 null
             ),
             new UserCompactionTaskGranularityConfig(Granularities.YEAR, null),
+            null,
             null
         )
     );
@@ -755,6 +866,7 @@ public class CompactSegmentsTest
         ArgumentMatchers.anyInt(),
         ArgumentMatchers.any(),
         granularitySpecArgumentCaptor.capture(),
+        ArgumentMatchers.any(),
         ArgumentMatchers.any()
     );
     // All segments is compact at the same time since we changed the segment granularity to YEAR and all segment
@@ -1059,6 +1171,7 @@ public class CompactSegmentsTest
                   null
               ),
               null,
+              null,
               null
           )
       );
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
index b32a7b7..0cf0ded 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
@@ -92,6 +92,7 @@ public class NewestSegmentFirstIteratorTest
         null,
         null,
         null,
+        null,
         null
     );
     Assert.assertEquals(
@@ -131,6 +132,7 @@ public class NewestSegmentFirstIteratorTest
             null
         ),
         null,
+        null,
         null
     );
     Assert.assertEquals(
@@ -170,6 +172,7 @@ public class NewestSegmentFirstIteratorTest
             null
         ),
         null,
+        null,
         null
     );
     Assert.assertEquals(
@@ -209,6 +212,7 @@ public class NewestSegmentFirstIteratorTest
             null
         ),
         null,
+        null,
         null
     );
     Assert.assertEquals(
@@ -248,6 +252,7 @@ public class NewestSegmentFirstIteratorTest
             null
         ),
         null,
+        null,
         null
     );
     Assert.assertEquals(
@@ -287,6 +292,7 @@ public class NewestSegmentFirstIteratorTest
             null
         ),
         null,
+        null,
         null
     );
     Assert.assertEquals(
@@ -326,6 +332,7 @@ public class NewestSegmentFirstIteratorTest
             null
         ),
         null,
+        null,
         null
     );
     Assert.assertEquals(
@@ -365,6 +372,7 @@ public class NewestSegmentFirstIteratorTest
             null
         ),
         null,
+        null,
         null
     );
     Assert.assertEquals(
@@ -404,6 +412,7 @@ public class NewestSegmentFirstIteratorTest
             null
         ),
         null,
+        null,
         null
     );
     Assert.assertEquals(
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
index 9b7f600..5ccd52c 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
@@ -1068,6 +1068,7 @@ public class NewestSegmentFirstPolicyTest
         skipOffsetFromLatest,
         null,
         granularitySpec,
+        null,
         null
     );
   }
diff --git a/website/.spelling b/website/.spelling
index ed803c7..5c7237b 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1656,6 +1656,7 @@ HadoopIndexTasks
 HttpEmitter
 HttpPostEmitter
 InetAddress.getLocalHost
+IOConfig
 JRE8u60
 KeyManager
 L1

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org