You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ma...@apache.org on 2021/11/09 05:17:44 UTC

[druid] branch master updated: Support changing dimension schema in Auto Compaction (#11874)

This is an automated email from the ASF dual-hosted git repository.

maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new ddc68c6  Support changing dimension schema in Auto Compaction  (#11874)
ddc68c6 is described below

commit ddc68c6a816bb1d0a0d657c80feee5e7fbd199b7
Author: Maytas Monsereenusorn <ma...@apache.org>
AuthorDate: Mon Nov 8 21:17:08 2021 -0800

    Support changing dimension schema in Auto Compaction  (#11874)
    
    * add impl
    
    * add unit tests
    
    * fix checkstyle
    
    * add impl
    
    * add impl
    
    * add impl
    
    * add impl
    
    * add impl
    
    * add impl
    
    * fix test
    
    * add IT
    
    * add IT
    
    * fix docs
    
    * add test
    
    * address comments
    
    * fix conflict
---
 .../NewestSegmentFirstPolicyBenchmark.java         |   1 +
 .../org/apache/druid/timeline/CompactionState.java |  14 +-
 .../org/apache/druid/timeline/DataSegmentTest.java |  99 +++++++++++++
 docs/configuration/index.md                        |  12 +-
 docs/ingestion/compaction.md                       |  13 +-
 .../common/task/AbstractBatchIndexTask.java        |  21 ++-
 .../druid/indexing/common/task/IndexTask.java      |   3 +-
 .../parallel/ParallelIndexSupervisorTask.java      |   3 +-
 .../task/ClientCompactionTaskQuerySerdeTest.java   |   9 ++
 .../common/task/CompactionTaskParallelRunTest.java |   6 +
 .../common/task/CompactionTaskRunTest.java         |   2 +
 .../apache/druid/testing/utils/CompactionUtil.java |   1 +
 .../coordinator/duty/ITAutoCompactionTest.java     |  61 +++++++-
 .../duty/ITAutoCompactionUpgradeTest.java          |   1 +
 .../wikipedia_index_task_with_dimension_spec.json  |  86 ++++++++++++
 .../ClientCompactionTaskDimensionsSpec.java        |  88 ++++++++++++
 .../client/indexing/ClientCompactionTaskQuery.java |  14 +-
 .../client/indexing/HttpIndexingServiceClient.java |   2 +
 .../client/indexing/IndexingServiceClient.java     |   1 +
 .../coordinator/DataSourceCompactionConfig.java    |  12 ++
 .../UserCompactionTaskDimensionsConfig.java        |  88 ++++++++++++
 .../server/coordinator/duty/CompactSegments.java   |  20 ++-
 .../duty/NewestSegmentFirstIterator.java           |  28 ++++
 .../ClientCompactionTaskDimensionsSpecTest.java    |  71 ++++++++++
 .../client/indexing/NoopIndexingServiceClient.java |   1 +
 .../DataSourceCompactionConfigTest.java            |  41 ++++++
 .../UserCompactionTaskDimensionsConfigTest.java    |  71 ++++++++++
 .../coordinator/duty/CompactSegmentsTest.java      | 153 +++++++++++++++++++--
 .../coordinator/duty/KillCompactionConfigTest.java |   3 +
 .../duty/NewestSegmentFirstIteratorTest.java       |   9 ++
 .../duty/NewestSegmentFirstPolicyTest.java         | 141 ++++++++++++++++---
 .../CoordinatorCompactionConfigsResourceTest.java  |   4 +
 .../druid/sql/calcite/schema/SystemSchemaTest.java |   1 +
 website/.spelling                                  |   1 +
 34 files changed, 1031 insertions(+), 50 deletions(-)

diff --git a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
index b93053e..4af61c6 100644
--- a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
@@ -101,6 +101,7 @@ public class NewestSegmentFirstPolicyBenchmark
               null,
               null,
               null,
+              null,
               null
           )
       );
diff --git a/core/src/main/java/org/apache/druid/timeline/CompactionState.java b/core/src/main/java/org/apache/druid/timeline/CompactionState.java
index 8588717..fff8234 100644
--- a/core/src/main/java/org/apache/druid/timeline/CompactionState.java
+++ b/core/src/main/java/org/apache/druid/timeline/CompactionState.java
@@ -21,6 +21,7 @@ package org.apache.druid.timeline;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 
 import java.util.Map;
@@ -40,6 +41,7 @@ import java.util.Objects;
 public class CompactionState
 {
   private final PartitionsSpec partitionsSpec;
+  private final DimensionsSpec dimensionsSpec;
   // org.apache.druid.segment.IndexSpec cannot be used here because it's in the 'processing' module which
   // has a dependency on the 'core' module where this class is.
   private final Map<String, Object> indexSpec;
@@ -50,11 +52,13 @@ public class CompactionState
   @JsonCreator
   public CompactionState(
       @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
+      @JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec,
       @JsonProperty("indexSpec") Map<String, Object> indexSpec,
       @JsonProperty("granularitySpec") Map<String, Object> granularitySpec
   )
   {
     this.partitionsSpec = partitionsSpec;
+    this.dimensionsSpec = dimensionsSpec;
     this.indexSpec = indexSpec;
     this.granularitySpec = granularitySpec;
   }
@@ -66,6 +70,12 @@ public class CompactionState
   }
 
   @JsonProperty
+  public DimensionsSpec getDimensionsSpec()
+  {
+    return dimensionsSpec;
+  }
+
+  @JsonProperty
   public Map<String, Object> getIndexSpec()
   {
     return indexSpec;
@@ -88,6 +98,7 @@ public class CompactionState
     }
     CompactionState that = (CompactionState) o;
     return Objects.equals(partitionsSpec, that.partitionsSpec) &&
+           Objects.equals(dimensionsSpec, that.dimensionsSpec) &&
            Objects.equals(indexSpec, that.indexSpec) &&
            Objects.equals(granularitySpec, that.granularitySpec);
   }
@@ -95,7 +106,7 @@ public class CompactionState
   @Override
   public int hashCode()
   {
-    return Objects.hash(partitionsSpec, indexSpec, granularitySpec);
+    return Objects.hash(partitionsSpec, dimensionsSpec, indexSpec, granularitySpec);
   }
 
   @Override
@@ -103,6 +114,7 @@ public class CompactionState
   {
     return "CompactionState{" +
            "partitionsSpec=" + partitionsSpec +
+           ", dimensionsSpec=" + dimensionsSpec +
            ", indexSpec=" + indexSpec +
            ", granularitySpec=" + granularitySpec +
            '}';
diff --git a/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java b/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java
index 66c3400..0ed35da 100644
--- a/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java
+++ b/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.RangeSet;
 import org.apache.druid.TestObjectMapper;
+import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.java.util.common.DateTimes;
@@ -120,6 +121,7 @@ public class DataSegmentTest
         new NumberedShardSpec(3, 0),
         new CompactionState(
             new HashedPartitionsSpec(100000, null, ImmutableList.of("dim1")),
+            new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "bar", "foo")), null, null),
             ImmutableMap.of(),
             ImmutableMap.of()
         ),
@@ -142,6 +144,7 @@ public class DataSegmentTest
     Assert.assertEquals(ImmutableMap.of("type", "numbered", "partitionNum", 3, "partitions", 0), objectMap.get("shardSpec"));
     Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion"));
     Assert.assertEquals(1, objectMap.get("size"));
+    Assert.assertEquals(4, ((Map) objectMap.get("lastCompactionState")).size());
 
     DataSegment deserializedSegment = MAPPER.readValue(MAPPER.writeValueAsString(segment), DataSegment.class);
 
@@ -154,6 +157,7 @@ public class DataSegmentTest
     Assert.assertEquals(segment.getShardSpec(), deserializedSegment.getShardSpec());
     Assert.assertEquals(segment.getSize(), deserializedSegment.getSize());
     Assert.assertEquals(segment.getId(), deserializedSegment.getId());
+    Assert.assertEquals(segment.getLastCompactionState(), deserializedSegment.getLastCompactionState());
 
     deserializedSegment = MAPPER.readValue(MAPPER.writeValueAsString(segment), DataSegment.class);
     Assert.assertEquals(0, segment.compareTo(deserializedSegment));
@@ -166,6 +170,100 @@ public class DataSegmentTest
   }
 
   @Test
+  public void testDeserializationDataSegmentLastCompactionStateWithoutDimensionsSpec() throws Exception
+  {
+    final Interval interval = Intervals.of("2011-10-01/2011-10-02");
+    final ImmutableMap<String, Object> loadSpec = ImmutableMap.of("something", "or_other");
+    DataSegment segment = new DataSegment(
+        "something",
+        interval,
+        "1",
+        loadSpec,
+        Arrays.asList("dim1", "dim2"),
+        Arrays.asList("met1", "met2"),
+        new NumberedShardSpec(3, 0),
+        new CompactionState(
+            new HashedPartitionsSpec(100000, null, ImmutableList.of("dim1")),
+            null,
+            ImmutableMap.of(),
+            ImmutableMap.of()
+        ),
+        TEST_VERSION,
+        1
+    );
+    String lastCompactionStateWithoutDimensionsSpec = "{"
+                                                      + "\"dataSource\": \"something\","
+                                                      + "\"interval\": \"2011-10-01T00:00:00.000Z/2011-10-02T00:00:00.000Z\","
+                                                      + "\"version\": \"1\","
+                                                      + "\"loadSpec\": {"
+                                                      + " \"something\": \"or_other\""
+                                                      + "},"
+                                                      + "\"dimensions\": \"dim1,dim2\","
+                                                      + "\"metrics\": \"met1,met2\","
+                                                      + "\"shardSpec\": {"
+                                                      + " \"type\": \"numbered\","
+                                                      + " \"partitionNum\": 3,"
+                                                      + " \"partitions\": 0"
+                                                      + "},"
+                                                      + "\"lastCompactionState\": {"
+                                                      + " \"partitionsSpec\": {"
+                                                      + "  \"type\": \"hashed\","
+                                                      + "  \"numShards\": null,"
+                                                      + "  \"partitionDimensions\": [\"dim1\"],"
+                                                      + "  \"partitionFunction\": \"murmur3_32_abs\","
+                                                      + "  \"maxRowsPerSegment\": 100000"
+                                                      + " },"
+                                                      + " \"indexSpec\": {},"
+                                                      + " \"granularitySpec\": {}"
+                                                      + "},"
+                                                      + "\"binaryVersion\": 9,"
+                                                      + "\"size\": 1,"
+                                                      + "\"identifier\": \"something_2011-10-01T00:00:00.000Z_2011-10-02T00:00:00.000Z_1_3\""
+                                                      + "}";
+
+    final Map<String, Object> objectMap = MAPPER.readValue(
+        lastCompactionStateWithoutDimensionsSpec,
+        JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+    );
+    Assert.assertEquals(11, objectMap.size());
+    Assert.assertEquals("something", objectMap.get("dataSource"));
+    Assert.assertEquals(interval.toString(), objectMap.get("interval"));
+    Assert.assertEquals("1", objectMap.get("version"));
+    Assert.assertEquals(loadSpec, objectMap.get("loadSpec"));
+    Assert.assertEquals("dim1,dim2", objectMap.get("dimensions"));
+    Assert.assertEquals("met1,met2", objectMap.get("metrics"));
+    Assert.assertEquals(ImmutableMap.of("type", "numbered", "partitionNum", 3, "partitions", 0), objectMap.get("shardSpec"));
+    Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion"));
+    Assert.assertEquals(1, objectMap.get("size"));
+    Assert.assertEquals(3, ((Map) objectMap.get("lastCompactionState")).size());
+
+    DataSegment deserializedSegment = MAPPER.readValue(lastCompactionStateWithoutDimensionsSpec, DataSegment.class);
+    Assert.assertEquals(segment.getDataSource(), deserializedSegment.getDataSource());
+    Assert.assertEquals(segment.getInterval(), deserializedSegment.getInterval());
+    Assert.assertEquals(segment.getVersion(), deserializedSegment.getVersion());
+    Assert.assertEquals(segment.getLoadSpec(), deserializedSegment.getLoadSpec());
+    Assert.assertEquals(segment.getDimensions(), deserializedSegment.getDimensions());
+    Assert.assertEquals(segment.getMetrics(), deserializedSegment.getMetrics());
+    Assert.assertEquals(segment.getShardSpec(), deserializedSegment.getShardSpec());
+    Assert.assertEquals(segment.getSize(), deserializedSegment.getSize());
+    Assert.assertEquals(segment.getId(), deserializedSegment.getId());
+    Assert.assertEquals(segment.getLastCompactionState(), deserializedSegment.getLastCompactionState());
+    Assert.assertNotNull(segment.getLastCompactionState());
+    Assert.assertNull(segment.getLastCompactionState().getDimensionsSpec());
+    Assert.assertNotNull(deserializedSegment.getLastCompactionState());
+    Assert.assertNull(deserializedSegment.getLastCompactionState().getDimensionsSpec());
+
+    deserializedSegment = MAPPER.readValue(lastCompactionStateWithoutDimensionsSpec, DataSegment.class);
+    Assert.assertEquals(0, segment.compareTo(deserializedSegment));
+
+    deserializedSegment = MAPPER.readValue(lastCompactionStateWithoutDimensionsSpec, DataSegment.class);
+    Assert.assertEquals(0, deserializedSegment.compareTo(segment));
+
+    deserializedSegment = MAPPER.readValue(lastCompactionStateWithoutDimensionsSpec, DataSegment.class);
+    Assert.assertEquals(segment.hashCode(), deserializedSegment.hashCode());
+  }
+
+  @Test
   public void testIdentifier()
   {
     final DataSegment segment = DataSegment.builder()
@@ -232,6 +330,7 @@ public class DataSegmentTest
   {
     final CompactionState compactionState = new CompactionState(
         new DynamicPartitionsSpec(null, null),
+        new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo")), null, null),
         Collections.singletonMap("test", "map"),
         Collections.singletonMap("test2", "map2")
     );
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 1eba2dc..e1a49b3 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -945,7 +945,8 @@ A description of the compaction config is:
 |`skipOffsetFromLatest`|The offset for searching segments to be compacted in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Strongly recommended to set for realtime dataSources. See [Data handling with compaction](../ingestion/compaction.md#data-handling-with-compaction)|no (default = "P1D")|
 |`tuningConfig`|Tuning config for compaction tasks. See below [Compaction Task TuningConfig](#automatic-compaction-tuningconfig).|no|
 |`taskContext`|[Task context](../ingestion/tasks.md#context) for compaction tasks.|no|
-|`granularitySpec`|Custom `granularitySpec` to describe the `segmentGranularity` for the compacted segments. See [Automatic compaction granularitySpec](#automatic-compaction-granularityspec)|No|
+|`granularitySpec`|Custom `granularitySpec`. See [Automatic compaction granularitySpec](#automatic-compaction-granularityspec)|No|
+|`dimensionsSpec`|Custom `dimensionsSpec`. See [Automatic compaction dimensionsSpec](#automatic-compaction-dimensions-spec)|No|
 |`ioConfig`|IO config for compaction tasks. See below [Compaction Task IOConfig](#automatic-compaction-ioconfig).|no|
 
 An example of compaction config is:
@@ -988,9 +989,6 @@ The below is a list of the supported configurations for auto compaction.
 |`chatHandlerNumRetries`|Retries for reporting the pushed segments in worker tasks.|no (default = 5)|
 
 ###### Automatic compaction granularitySpec
-You can optionally use the `granularitySpec` object to configure the segment granularity of the compacted segments.
-
-`granularitySpec` takes the following keys:
 
 |Field|Description|Required|
 |-----|-----------|--------|
@@ -998,6 +996,12 @@ You can optionally use the `granularitySpec` object to configure the segment gra
 |`queryGranularity`|The resolution of timestamp storage within each segment. Defaults to 'null', which preserves the original query granularity. Accepts all [Query granularity](../querying/granularities.md) values.|No|
 |`rollup`|Whether to enable ingestion-time rollup or not. Defaults to 'null', which preserves the original setting. Note that once data is rollup, individual records can no longer be recovered. |No|
 
+###### Automatic compaction dimensions spec
+
+|Field|Description|Required|
+|-----|-----------|--------|
+|`dimensions`| A list of dimension names or objects. Defaults to 'null', which preserves the original dimensions. Note that setting this will cause segments manually compacted with `dimensionExclusions` to be compacted again.|No|
+
 ###### Automatic compaction IOConfig
 
 Auto compaction supports a subset of the [IOConfig for Parallel task](../ingestion/native-batch.md).
diff --git a/docs/ingestion/compaction.md b/docs/ingestion/compaction.md
index fa63a63..3762fd5 100644
--- a/docs/ingestion/compaction.md
+++ b/docs/ingestion/compaction.md
@@ -109,7 +109,7 @@ To perform a manual compaction, you submit a compaction task. Compaction tasks m
 |`id`|Task id|No|
 |`dataSource`|Data source name to compact|Yes|
 |`ioConfig`|I/O configuration for compaction task. See [Compaction I/O configuration](#compaction-io-configuration) for details.|Yes|
-|`dimensionsSpec`|Custom dimensions spec. The compaction task uses the specified dimensions spec if it exists instead of generating one.|No|
+|`dimensionsSpec`|Custom dimensions spec. The compaction task uses the specified dimensions spec if it exists instead of generating one. See [Compaction dimensionsSpec](#compaction-dimensions-spec) for details.|No|
 |`metricsSpec`|Custom metrics spec. The compaction task uses the specified metrics spec rather than generating one.|No|
 |`segmentGranularity`|When set, the compaction task changes the segment granularity for the given interval.  Deprecated. Use `granularitySpec`. |No.|
 |`tuningConfig`|[Parallel indexing task tuningConfig](native-batch.md#tuningconfig). `awaitSegmentAvailabilityTimeoutMillis` in the tuning config is not currently supported for compaction tasks. Do not set it to a non-zero value.|No|
@@ -181,6 +181,17 @@ Druid supports two supported `inputSpec` formats:
      |`type`|Task type. Should be `segments`|Yes|
      |`segments`|A list of segment IDs|Yes|
 
+
+### Compaction dimensions spec
+You can optionally use the `dimensionsSpec` object to configure the dimensions of the compacted segments.
+
+`dimensionsSpec` takes the following keys:
+
+|Field|Description|Required|
+|-----|-----------|--------|
+|`dimensions`| A list of dimension names or objects. Cannot have the same column in both `dimensions` and `dimensionExclusions`. Defaults to `null`, which preserves the original dimensions.|No|
+|`dimensionExclusions`| The names of dimensions to exclude from compaction. Only names are supported here, not objects. This list is only used if the dimensions list is null or empty; otherwise it is ignored. Defaults to `[]`.|No|
+
 ### Compaction granularity spec
 
 You can optionally use the `granularitySpec` object to configure the segment granularity and the query granularity of the compacted segments. Their syntax is as follows:
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index a289221..7faaa09 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -28,6 +28,7 @@ import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputSource;
 import org.apache.druid.data.input.InputSourceReader;
+import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.TaskLock;
@@ -56,6 +57,8 @@ import org.apache.druid.segment.handoff.SegmentHandoffNotifier;
 import org.apache.druid.segment.incremental.ParseExceptionHandler;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.IngestionSpec;
+import org.apache.druid.segment.indexing.TuningConfig;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
 import org.apache.druid.timeline.CompactionState;
 import org.apache.druid.timeline.DataSegment;
@@ -479,14 +482,22 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
   public static Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction(
       boolean storeCompactionState,
       TaskToolbox toolbox,
-      IndexTuningConfig tuningConfig,
-      GranularitySpec granularitySpec
+      IngestionSpec ingestionSpec
   )
   {
     if (storeCompactionState) {
-      final Map<String, Object> indexSpecMap = tuningConfig.getIndexSpec().asMap(toolbox.getJsonMapper());
-      final Map<String, Object> granularitySpecMap = granularitySpec.asMap(toolbox.getJsonMapper());
-      final CompactionState compactionState = new CompactionState(tuningConfig.getPartitionsSpec(), indexSpecMap, granularitySpecMap);
+      TuningConfig tuningConfig = ingestionSpec.getTuningConfig();
+      GranularitySpec granularitySpec = ingestionSpec.getDataSchema().getGranularitySpec();
+      // We do not need to store dimensionExclusions and spatialDimensions since auto compaction does not support them
+      DimensionsSpec dimensionsSpec = ingestionSpec.getDataSchema().getDimensionsSpec() == null
+                                      ? null
+                                      : new DimensionsSpec(ingestionSpec.getDataSchema().getDimensionsSpec().getDimensions(), null, null);
+      final CompactionState compactionState = new CompactionState(
+          tuningConfig.getPartitionsSpec(),
+          dimensionsSpec,
+          tuningConfig.getIndexSpec().asMap(toolbox.getJsonMapper()),
+          granularitySpec.asMap(toolbox.getJsonMapper())
+      );
       return segments -> segments
           .stream()
           .map(s -> s.withLastCompactionState(compactionState))
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index a993b53..ba59ba1 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -925,8 +925,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
           compactionStateAnnotateFunction(
               storeCompactionState,
               toolbox,
-              ingestionSchema.getTuningConfig(),
-              ingestionSchema.getDataSchema().getGranularitySpec()
+              ingestionSchema
           );
 
       // Probably we can publish atomicUpdateGroup along with segments.
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index ec5a5df..c185d71 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -1051,8 +1051,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
     final Function<Set<DataSegment>, Set<DataSegment>> annotateFunction = compactionStateAnnotateFunction(
         storeCompactionState,
         toolbox,
-        ingestionSchema.getTuningConfig(),
-        ingestionSchema.getDataSchema().getGranularitySpec()
+        ingestionSchema
     );
 
     Set<DataSegment> segmentsFoundForDrop = null;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
index 8d78d32..c1a3363 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
@@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.druid.client.coordinator.CoordinatorClient;
 import org.apache.druid.client.indexing.ClientCompactionIOConfig;
 import org.apache.druid.client.indexing.ClientCompactionIntervalSpec;
+import org.apache.druid.client.indexing.ClientCompactionTaskDimensionsSpec;
 import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
 import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
 import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
@@ -34,6 +35,7 @@ import org.apache.druid.client.indexing.ClientTaskQuery;
 import org.apache.druid.client.indexing.IndexingServiceClient;
 import org.apache.druid.client.indexing.NoopIndexingServiceClient;
 import org.apache.druid.data.input.SegmentsSplitHintSpec;
+import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.guice.GuiceAnnotationIntrospector;
 import org.apache.druid.guice.GuiceInjectableValues;
 import org.apache.druid.guice.GuiceInjectors;
@@ -117,6 +119,7 @@ public class ClientCompactionTaskQuerySerdeTest
             100
         ),
         new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR, true),
+        new ClientCompactionTaskDimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))),
         ImmutableMap.of("key", "value")
     );
 
@@ -211,6 +214,10 @@ public class ClientCompactionTaskQuerySerdeTest
         task.getIoConfig().isDropExisting()
     );
     Assert.assertEquals(query.getContext(), task.getContext());
+    Assert.assertEquals(
+        query.getDimensionsSpec().getDimensions(),
+        task.getDimensionsSpec().getDimensions()
+    );
   }
 
   @Test
@@ -269,6 +276,7 @@ public class ClientCompactionTaskQuerySerdeTest
             )
         )
         .granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR, true))
+        .dimensionsSpec(new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim")), ImmutableList.of("__time", "val"), null))
         .build();
 
     final ClientCompactionTaskQuery expected = new ClientCompactionTaskQuery(
@@ -312,6 +320,7 @@ public class ClientCompactionTaskQuerySerdeTest
             100
         ),
         new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR, true),
+        new ClientCompactionTaskDimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))),
         new HashMap<>()
     );
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index e9ec434..c4f96c2 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -158,6 +158,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
       // Expect compaction state to exist as store compaction state by default
       CompactionState expectedState = new CompactionState(
           new DynamicPartitionsSpec(null, Long.MAX_VALUE),
+          new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim")), null, null),
           compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
           getObjectMapper().readValue(
               getObjectMapper().writeValueAsString(
@@ -198,6 +199,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
       Assert.assertSame(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass());
       CompactionState expectedState = new CompactionState(
           new HashedPartitionsSpec(null, 3, null),
+          new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim")), null, null),
           compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
           getObjectMapper().readValue(
               getObjectMapper().writeValueAsString(
@@ -238,6 +240,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
       Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass());
       CompactionState expectedState = new CompactionState(
           new SingleDimensionPartitionsSpec(7, null, "dim", false),
+          new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim")), null, null),
           compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
           getObjectMapper().readValue(
               getObjectMapper().writeValueAsString(
@@ -281,6 +284,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
       Assert.assertSame(DimensionRangeShardSpec.class, segment.getShardSpec().getClass());
       CompactionState expectedState = new CompactionState(
           new DimensionRangePartitionsSpec(7, null, Arrays.asList("dim1", "dim2"), false),
+          new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim")), null, null),
           compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
           getObjectMapper().readValue(
               getObjectMapper().writeValueAsString(
@@ -321,6 +325,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
       Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass());
       CompactionState expectedState = new CompactionState(
           new SingleDimensionPartitionsSpec(7, null, "dim", false),
+          new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim")), null, null),
           compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
           getObjectMapper().readValue(
               getObjectMapper().writeValueAsString(
@@ -364,6 +369,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
       Assert.assertSame(DimensionRangeShardSpec.class, segment.getShardSpec().getClass());
       CompactionState expectedState = new CompactionState(
           new DimensionRangePartitionsSpec(7, null, Arrays.asList("dim1", "dim2"), false),
+          new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim")), null, null),
           compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
           getObjectMapper().readValue(
               getObjectMapper().writeValueAsString(
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index eebd875..b1e3c03 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -194,6 +194,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
     // Expected compaction state to exist after compaction as we store compaction state by default
     return new CompactionState(
       new DynamicPartitionsSpec(5000000, Long.MAX_VALUE),
+      new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim")), null, null),
       mapper.readValue(mapper.writeValueAsString(new IndexSpec()), Map.class),
       mapper.readValue(
           mapper.writeValueAsString(
@@ -338,6 +339,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
         );
         CompactionState expectedState = new CompactionState(
             new HashedPartitionsSpec(null, 3, null),
+            new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim")), null, null),
             compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
             getObjectMapper().readValue(
                 getObjectMapper().writeValueAsString(
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java
index 5ccd5f4..6dcf790 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java
@@ -69,6 +69,7 @@ public class CompactionUtil
             1
         ),
         null,
+        null,
         new UserCompactionTaskIOConfig(true),
         null
     );
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
index 157b8d5..c7e5634 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.inject.Inject;
 import org.apache.commons.io.IOUtils;
 import org.apache.druid.data.input.MaxSizeSplitHintSpec;
+import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
@@ -38,6 +39,7 @@ import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
 import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig;
 import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
 import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
 import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
@@ -76,6 +78,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
   private static final Logger LOG = new Logger(ITAutoCompactionTest.class);
   private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json";
   private static final String INDEX_TASK_WITH_GRANULARITY_SPEC = "/indexer/wikipedia_index_task_with_granularity_spec.json";
+  private static final String INDEX_TASK_WITH_DIMENSION_SPEC = "/indexer/wikipedia_index_task_with_dimension_spec.json";
   private static final String INDEX_ROLLUP_QUERIES_RESOURCE = "/indexer/wikipedia_index_rollup_queries.json";
   private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
   private static final int MAX_ROWS_PER_SEGMENT_COMPACTED = 10000;
@@ -175,7 +178,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
       LOG.info("Auto compaction test with hash partitioning");
 
       final HashedPartitionsSpec hashedPartitionsSpec = new HashedPartitionsSpec(null, 3, null);
-      submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1, null, false);
+      submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1, null, null, false);
       // 2 segments published per day after compaction.
       forceTriggerAutoCompaction(4);
       verifyQuery(INDEX_QUERIES_RESOURCE);
@@ -190,7 +193,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
           "city",
           false
       );
-      submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1, null, false);
+      submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1, null, null, false);
       forceTriggerAutoCompaction(2);
       verifyQuery(INDEX_QUERIES_RESOURCE);
       verifySegmentsCompacted(rangePartitionsSpec, 2);
@@ -679,6 +682,51 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
     }
   }
 
+  @Test
+  public void testAutoCompactionDutyWithDimensionsSpec() throws Exception
+  {
+    // Index data with dimensions "page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous",
+    // "namespace", "continent", "country", "region", "city"
+    loadData(INDEX_TASK_WITH_DIMENSION_SPEC);
+    try (final Closeable ignored = unloader(fullDatasourceName)) {
+      final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
+      intervalsBeforeCompaction.sort(null);
+      // 4 segments across 2 days (4 total)...
+      verifySegmentsCount(4);
+
+      // Result is not rollup
+      Map<String, Object> expectedResult = ImmutableMap.of(
+          "%%EXPECTED_COUNT_RESULT%%", 2,
+          "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+
+      // Compact and change dimension to only "language"
+      submitCompactionConfig(
+          MAX_ROWS_PER_SEGMENT_COMPACTED,
+          NO_SKIP_OFFSET,
+          null,
+          new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("language"))),
+          false
+      );
+      forceTriggerAutoCompaction(2);
+
+      // Result should rollup on language dimension
+      expectedResult = ImmutableMap.of(
+          "%%EXPECTED_COUNT_RESULT%%", 1,
+          "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(516.0))))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+      verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
+
+      List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
+      // Verify compacted segments does not get compacted again
+      forceTriggerAutoCompaction(2);
+      List<TaskResponseObject> compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName);
+      Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size());
+    }
+  }
+
   private void loadData(String indexTask) throws Exception
   {
     loadData(indexTask, ImmutableMap.of());
@@ -752,7 +800,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
 
   private void submitCompactionConfig(Integer maxRowsPerSegment, Period skipOffsetFromLatest, UserCompactionTaskGranularityConfig granularitySpec, boolean dropExisting) throws Exception
   {
-    submitCompactionConfig(new DynamicPartitionsSpec(maxRowsPerSegment, null), skipOffsetFromLatest, 1, granularitySpec, dropExisting);
+    submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, granularitySpec, null, dropExisting);
+  }
+
+  private void submitCompactionConfig(Integer maxRowsPerSegment, Period skipOffsetFromLatest, UserCompactionTaskGranularityConfig granularitySpec, UserCompactionTaskDimensionsConfig dimensionsSpec, boolean dropExisting) throws Exception
+  {
+    submitCompactionConfig(new DynamicPartitionsSpec(maxRowsPerSegment, null), skipOffsetFromLatest, 1, granularitySpec, dimensionsSpec, dropExisting);
   }
 
   private void submitCompactionConfig(
@@ -760,6 +813,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
       Period skipOffsetFromLatest,
       int maxNumConcurrentSubTasks,
       UserCompactionTaskGranularityConfig granularitySpec,
+      UserCompactionTaskDimensionsConfig dimensionsSpec,
       boolean dropExisting
   ) throws Exception
   {
@@ -789,6 +843,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
             1
         ),
         granularitySpec,
+        dimensionsSpec,
         !dropExisting ? null : new UserCompactionTaskIOConfig(true),
         null
     );
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java
index a4b1ed6..0ca16cf 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java
@@ -97,6 +97,7 @@ public class ITAutoCompactionUpgradeTest extends AbstractIndexerTest
             1
         ),
         new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null),
+        null,
         new UserCompactionTaskIOConfig(true),
         null
     );
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_task_with_dimension_spec.json b/integration-tests/src/test/resources/indexer/wikipedia_index_task_with_dimension_spec.json
new file mode 100644
index 0000000..1fa8b4e
--- /dev/null
+++ b/integration-tests/src/test/resources/indexer/wikipedia_index_task_with_dimension_spec.json
@@ -0,0 +1,86 @@
+{
+  "type": "index",
+  "spec": {
+    "dataSchema": {
+      "dataSource": "%%DATASOURCE%%",
+      "metricsSpec": [
+        {
+          "type": "count",
+          "name": "count"
+        },
+        {
+          "type": "doubleSum",
+          "name": "added",
+          "fieldName": "added"
+        },
+        {
+          "type": "doubleSum",
+          "name": "deleted",
+          "fieldName": "deleted"
+        },
+        {
+          "type": "doubleSum",
+          "name": "delta",
+          "fieldName": "delta"
+        },
+        {
+          "name": "thetaSketch",
+          "type": "thetaSketch",
+          "fieldName": "user"
+        },
+        {
+          "name": "quantilesDoublesSketch",
+          "type": "quantilesDoublesSketch",
+          "fieldName": "delta"
+        },
+        {
+          "name": "HLLSketchBuild",
+          "type": "HLLSketchBuild",
+          "fieldName": "user"
+        }
+      ],
+      "granularitySpec": {
+        "segmentGranularity": "DAY",
+        "queryGranularity": "DAY",
+        "intervals" : [ "2013-08-31/2013-09-02" ]
+      },
+      "parser": {
+        "parseSpec": {
+          "format" : "json",
+          "timestampSpec": {
+            "column": "timestamp"
+          },
+          "dimensionsSpec": {
+            "dimensions": [
+              "page",
+              {"type": "string", "name": "language", "createBitmapIndex": false},
+              "user",
+              "unpatrolled",
+              "newPage",
+              "robot",
+              "anonymous",
+              "namespace",
+              "continent",
+              "country",
+              "region",
+              "city"
+            ]
+          }
+        }
+      }
+    },
+    "ioConfig": {
+      "type": "index",
+      "firehose": {
+        "type": "local",
+        "baseDir": "/resources/data/batch_index/json",
+        "filter": "wikipedia_index_data*"
+      }
+    },
+    "tuningConfig": {
+      "type": "index",
+      "maxRowsPerSegment": 3,
+      "awaitSegmentAvailabilityTimeoutMillis": %%SEGMENT_AVAIL_TIMEOUT_MILLIS%%
+    }
+  }
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskDimensionsSpec.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskDimensionsSpec.java
new file mode 100644
index 0000000..4937bce
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskDimensionsSpec.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client.indexing;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.java.util.common.parsers.ParserUtils;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Spec containing dimension configs for Compaction Task.
+ * This class mimics JSON field names for fields supported in compaction task with
+ * the corresponding fields in {@link org.apache.druid.data.input.impl.DimensionsSpec}.
+ * This is done for end-user ease of use. Basically, end-user will use the same syntax / JSON structure to set
+ * dimension configs for Compaction task as they would for any other ingestion task.
+ */
+public class ClientCompactionTaskDimensionsSpec
+{
+  @Nullable private final List<DimensionSchema> dimensions;
+
+  @JsonCreator
+  public ClientCompactionTaskDimensionsSpec(
+      @Nullable @JsonProperty("dimensions") List<DimensionSchema> dimensions
+  )
+  {
+    if (dimensions != null) {
+      List<String> dimensionNames = dimensions.stream().map(DimensionSchema::getName).collect(Collectors.toList());
+      ParserUtils.validateFields(dimensionNames);
+    }
+    this.dimensions = dimensions;
+  }
+
+  @Nullable
+  @JsonProperty
+  public List<DimensionSchema> getDimensions()
+  {
+    return dimensions;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ClientCompactionTaskDimensionsSpec that = (ClientCompactionTaskDimensionsSpec) o;
+    return Objects.equals(dimensions, that.dimensions);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(dimensions);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "ClientCompactionTaskDimensionsSpec{" +
+           "dimensions=" + dimensions +
+           '}';
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java
index 62e39d6..89e5049 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java
@@ -39,6 +39,7 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery
   private final ClientCompactionIOConfig ioConfig;
   private final ClientCompactionTaskQueryTuningConfig tuningConfig;
   private final ClientCompactionTaskGranularitySpec granularitySpec;
+  private final ClientCompactionTaskDimensionsSpec dimensionsSpec;
   private final Map<String, Object> context;
 
   @JsonCreator
@@ -48,6 +49,7 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery
       @JsonProperty("ioConfig") ClientCompactionIOConfig ioConfig,
       @JsonProperty("tuningConfig") ClientCompactionTaskQueryTuningConfig tuningConfig,
       @JsonProperty("granularitySpec") ClientCompactionTaskGranularitySpec granularitySpec,
+      @JsonProperty("dimensionsSpec") ClientCompactionTaskDimensionsSpec dimensionsSpec,
       @JsonProperty("context") Map<String, Object> context
   )
   {
@@ -56,6 +58,7 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery
     this.ioConfig = ioConfig;
     this.tuningConfig = tuningConfig;
     this.granularitySpec = granularitySpec;
+    this.dimensionsSpec = dimensionsSpec;
     this.context = context;
   }
 
@@ -99,12 +102,17 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery
   }
 
   @JsonProperty
+  public ClientCompactionTaskDimensionsSpec getDimensionsSpec()
+  {
+    return dimensionsSpec;
+  }
+
+  @JsonProperty
   public Map<String, Object> getContext()
   {
     return context;
   }
 
-
   @Override
   public boolean equals(Object o)
   {
@@ -120,13 +128,14 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery
            Objects.equals(ioConfig, that.ioConfig) &&
            Objects.equals(tuningConfig, that.tuningConfig) &&
            Objects.equals(granularitySpec, that.granularitySpec) &&
+           Objects.equals(dimensionsSpec, that.dimensionsSpec) &&
            Objects.equals(context, that.context);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(id, dataSource, ioConfig, tuningConfig, granularitySpec, context);
+    return Objects.hash(id, dataSource, ioConfig, tuningConfig, granularitySpec, dimensionsSpec, context);
   }
 
   @Override
@@ -138,6 +147,7 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery
            ", ioConfig=" + ioConfig +
            ", tuningConfig=" + tuningConfig +
            ", granularitySpec=" + granularitySpec +
+           ", dimensionsSpec=" + dimensionsSpec +
            ", context=" + context +
            '}';
   }
diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
index afab3b5..c2f80a0 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
@@ -81,6 +81,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
       int compactionTaskPriority,
       @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
       @Nullable ClientCompactionTaskGranularitySpec granularitySpec,
+      @Nullable ClientCompactionTaskDimensionsSpec dimensionsSpec,
       @Nullable Boolean dropExisting,
       @Nullable Map<String, Object> context
   )
@@ -103,6 +104,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
         new ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments), dropExisting),
         tuningConfig,
         granularitySpec,
+        dimensionsSpec,
         context
     );
     return runTask(taskId, taskQuery);
diff --git a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
index 84f9f55..d249989 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
@@ -41,6 +41,7 @@ public interface IndexingServiceClient
       int compactionTaskPriority,
       @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
       @Nullable ClientCompactionTaskGranularitySpec granularitySpec,
+      @Nullable ClientCompactionTaskDimensionsSpec dimensionsSpec,
       @Nullable Boolean dropExisting,
       @Nullable Map<String, Object> context
   );
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
index dfe1f57..33d8043 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
@@ -47,6 +47,7 @@ public class DataSourceCompactionConfig
   private final Period skipOffsetFromLatest;
   private final UserCompactionTaskQueryTuningConfig tuningConfig;
   private final UserCompactionTaskGranularityConfig granularitySpec;
+  private final UserCompactionTaskDimensionsConfig dimensionsSpec;
   private final UserCompactionTaskIOConfig ioConfig;
   private final Map<String, Object> taskContext;
 
@@ -59,6 +60,7 @@ public class DataSourceCompactionConfig
       @JsonProperty("skipOffsetFromLatest") @Nullable Period skipOffsetFromLatest,
       @JsonProperty("tuningConfig") @Nullable UserCompactionTaskQueryTuningConfig tuningConfig,
       @JsonProperty("granularitySpec") @Nullable UserCompactionTaskGranularityConfig granularitySpec,
+      @JsonProperty("dimensionsSpec") @Nullable UserCompactionTaskDimensionsConfig dimensionsSpec,
       @JsonProperty("ioConfig") @Nullable UserCompactionTaskIOConfig ioConfig,
       @JsonProperty("taskContext") @Nullable Map<String, Object> taskContext
   )
@@ -75,6 +77,7 @@ public class DataSourceCompactionConfig
     this.tuningConfig = tuningConfig;
     this.ioConfig = ioConfig;
     this.granularitySpec = granularitySpec;
+    this.dimensionsSpec = dimensionsSpec;
     this.taskContext = taskContext;
   }
 
@@ -133,6 +136,13 @@ public class DataSourceCompactionConfig
 
   @JsonProperty
   @Nullable
+  public UserCompactionTaskDimensionsConfig getDimensionsSpec()
+  {
+    return dimensionsSpec;
+  }
+
+  @JsonProperty
+  @Nullable
   public Map<String, Object> getTaskContext()
   {
     return taskContext;
@@ -155,6 +165,7 @@ public class DataSourceCompactionConfig
            Objects.equals(skipOffsetFromLatest, that.skipOffsetFromLatest) &&
            Objects.equals(tuningConfig, that.tuningConfig) &&
            Objects.equals(granularitySpec, that.granularitySpec) &&
+           Objects.equals(dimensionsSpec, that.dimensionsSpec) &&
            Objects.equals(ioConfig, that.ioConfig) &&
            Objects.equals(taskContext, that.taskContext);
   }
@@ -170,6 +181,7 @@ public class DataSourceCompactionConfig
         skipOffsetFromLatest,
         tuningConfig,
         granularitySpec,
+        dimensionsSpec,
         ioConfig,
         taskContext
     );
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskDimensionsConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskDimensionsConfig.java
new file mode 100644
index 0000000..bee7f35
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskDimensionsConfig.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.java.util.common.parsers.ParserUtils;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Spec containing dimension configs for Compaction Task.
+ * This class mimics JSON field names for fields supported in compaction task with
+ * the corresponding fields in {@link org.apache.druid.data.input.impl.DimensionsSpec}.
+ * This is done for end-user ease of use. Basically, end-user will use the same syntax / JSON structure to set
+ * dimension configs for Compaction task as they would for any other ingestion task.
+ */
+public class UserCompactionTaskDimensionsConfig
+{
+  @Nullable private final List<DimensionSchema> dimensions;
+
+  @JsonCreator
+  public UserCompactionTaskDimensionsConfig(
+      @Nullable @JsonProperty("dimensions") List<DimensionSchema> dimensions
+  )
+  {
+    if (dimensions != null) {
+      List<String> dimensionNames = dimensions.stream().map(DimensionSchema::getName).collect(Collectors.toList());
+      ParserUtils.validateFields(dimensionNames);
+    }
+    this.dimensions = dimensions;
+  }
+
+  @Nullable
+  @JsonProperty
+  public List<DimensionSchema> getDimensions()
+  {
+    return dimensions;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    UserCompactionTaskDimensionsConfig that = (UserCompactionTaskDimensionsConfig) o;
+    return Objects.equals(dimensions, that.dimensions);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(dimensions);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "UserCompactionTaskDimensionsConfig{" +
+           "dimensions=" + dimensions +
+           '}';
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index b58910a..adf3c96 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator.duty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.inject.Inject;
+import org.apache.druid.client.indexing.ClientCompactionTaskDimensionsSpec;
 import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
 import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
 import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
@@ -341,16 +342,26 @@ public class CompactSegments implements CoordinatorDuty
         snapshotBuilder.incrementSegmentCountCompacted(segmentsToCompact.size());
 
         final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName);
-        ClientCompactionTaskGranularitySpec queryGranularitySpec;
+        // Create granularitySpec to send to compaction task
+        ClientCompactionTaskGranularitySpec granularitySpec;
         if (config.getGranularitySpec() != null) {
-          queryGranularitySpec = new ClientCompactionTaskGranularitySpec(
+          granularitySpec = new ClientCompactionTaskGranularitySpec(
               config.getGranularitySpec().getSegmentGranularity(),
               config.getGranularitySpec().getQueryGranularity(),
               config.getGranularitySpec().isRollup()
 
           );
         } else {
-          queryGranularitySpec = null;
+          granularitySpec = null;
+        }
+        // Create dimensionsSpec to send to compaction task
+        ClientCompactionTaskDimensionsSpec dimensionsSpec;
+        if (config.getDimensionsSpec() != null) {
+          dimensionsSpec = new ClientCompactionTaskDimensionsSpec(
+              config.getDimensionsSpec().getDimensions()
+          );
+        } else {
+          dimensionsSpec = null;
         }
 
         Boolean dropExisting = null;
@@ -364,7 +375,8 @@ public class CompactSegments implements CoordinatorDuty
             segmentsToCompact,
             config.getTaskPriority(),
             ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()),
-            queryGranularitySpec,
+            granularitySpec,
+            dimensionsSpec,
             dropExisting,
             newAutoCompactionContext(config.getTaskContext())
         );
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
index 4225fec..11dd2c4 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
@@ -26,6 +26,8 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
 import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.java.util.common.DateTimes;
@@ -424,6 +426,11 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
                                        existingGranularitySpec.isRollup() :
                                        null;
         if (existingRollup == null || !config.getGranularitySpec().isRollup().equals(existingRollup)) {
+          log.info(
+              "Configured rollup[%s] is different from the rollup[%s] of segments. Needs compaction",
+              config.getGranularitySpec().isRollup(),
+              existingRollup
+          );
           return true;
         }
       }
@@ -435,6 +442,27 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
                                                      existingGranularitySpec.getQueryGranularity() :
                                                      null;
         if (!config.getGranularitySpec().getQueryGranularity().equals(existingQueryGranularity)) {
+          log.info(
+              "Configured queryGranularity[%s] is different from the queryGranularity[%s] of segments. Needs compaction",
+              config.getGranularitySpec().getQueryGranularity(),
+              existingQueryGranularity
+          );
+          return true;
+        }
+      }
+    }
+
+    if (config.getDimensionsSpec() != null) {
+      final DimensionsSpec existingDimensionsSpec = lastCompactionState.getDimensionsSpec();
+      // Checks for list of dimensions
+      if (config.getDimensionsSpec().getDimensions() != null) {
+        final List<DimensionSchema> existingDimensions = existingDimensionsSpec != null ?
+                                                         existingDimensionsSpec.getDimensions() :
+                                                         null;
+        if (!config.getDimensionsSpec().getDimensions().equals(existingDimensions)) {
+          log.info(
+              "Configured dimensionsSpec is different from the dimensionsSpec of segments. Needs compaction"
+          );
           return true;
         }
       }
diff --git a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionTaskDimensionsSpecTest.java b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionTaskDimensionsSpecTest.java
new file mode 100644
index 0000000..6b2df99
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionTaskDimensionsSpecTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client.indexing;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class ClientCompactionTaskDimensionsSpecTest
+{
+  @Test
+  public void testEquals()
+  {
+    EqualsVerifier.forClass(ClientCompactionTaskDimensionsSpec.class)
+                  .withPrefabValues(
+                      DimensionSchema.class,
+                      new StringDimensionSchema("bar", DimensionSchema.MultiValueHandling.ofDefault(), true),
+                      new StringDimensionSchema("foo", DimensionSchema.MultiValueHandling.ofDefault(), true)
+                  )
+                  .usingGetClass()
+                  .verify();
+  }
+
+  @Test
+  public void testSerde() throws IOException
+  {
+    final ClientCompactionTaskDimensionsSpec expected = new ClientCompactionTaskDimensionsSpec(
+        DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))
+    );
+    final ObjectMapper mapper = new ObjectMapper();
+    final byte[] json = mapper.writeValueAsBytes(expected);
+    final ClientCompactionTaskDimensionsSpec fromJson = (ClientCompactionTaskDimensionsSpec) mapper.readValue(
+        json,
+        ClientCompactionTaskDimensionsSpec.class
+    );
+    Assert.assertEquals(expected, fromJson);
+  }
+
+  @Test(expected = ParseException.class)
+  public void testInvalidDimensionsField()
+  {
+    final ClientCompactionTaskDimensionsSpec expected = new ClientCompactionTaskDimensionsSpec(
+        DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim", "dim"))
+    );
+  }
+}
diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
index 2035dba..5bc7b87 100644
--- a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
+++ b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
@@ -51,6 +51,7 @@ public class NoopIndexingServiceClient implements IndexingServiceClient
       int compactionTaskPriority,
       @Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
       @Nullable ClientCompactionTaskGranularitySpec granularitySpec,
+      @Nullable ClientCompactionTaskDimensionsSpec dimensionsSpec,
       @Nullable Boolean dropExisting,
       @Nullable Map<String, Object> context
   )
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
index 4f8fd12..27732ef 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
@@ -20,8 +20,10 @@
 package org.apache.druid.server.coordinator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.data.input.SegmentsSplitHintSpec;
+import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.HumanReadableBytes;
@@ -59,6 +61,7 @@ public class DataSourceCompactionConfigTest
         null,
         null,
         null,
+        null,
         ImmutableMap.of("key", "val")
     );
     final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -86,6 +89,7 @@ public class DataSourceCompactionConfigTest
         null,
         null,
         null,
+        null,
         ImmutableMap.of("key", "val")
     );
     final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -130,6 +134,7 @@ public class DataSourceCompactionConfigTest
         ),
         null,
         null,
+        null,
         ImmutableMap.of("key", "val")
     );
     final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -174,6 +179,7 @@ public class DataSourceCompactionConfigTest
         ),
         null,
         null,
+        null,
         ImmutableMap.of("key", "val")
     );
 
@@ -240,6 +246,7 @@ public class DataSourceCompactionConfigTest
         null,
         new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
         null,
+        null,
         ImmutableMap.of("key", "val")
     );
     final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -267,6 +274,7 @@ public class DataSourceCompactionConfigTest
         null,
         new UserCompactionTaskGranularityConfig(null, Granularities.YEAR, null),
         null,
+        null,
         ImmutableMap.of("key", "val")
     );
     final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -297,6 +305,7 @@ public class DataSourceCompactionConfigTest
         null,
         null,
         null,
+        null,
         ImmutableMap.of("key", "val")
     );
     final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -324,6 +333,7 @@ public class DataSourceCompactionConfigTest
         null,
         new UserCompactionTaskGranularityConfig(null, null, null),
         null,
+        null,
         ImmutableMap.of("key", "val")
     );
     final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -351,6 +361,7 @@ public class DataSourceCompactionConfigTest
         null,
         new UserCompactionTaskGranularityConfig(null, null, true),
         null,
+        null,
         ImmutableMap.of("key", "val")
     );
     final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -380,6 +391,7 @@ public class DataSourceCompactionConfigTest
         new Period(3600),
         null,
         new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
+        null,
         new UserCompactionTaskIOConfig(true),
         ImmutableMap.of("key", "val")
     );
@@ -408,6 +420,7 @@ public class DataSourceCompactionConfigTest
         new Period(3600),
         null,
         new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
+        null,
         new UserCompactionTaskIOConfig(null),
         ImmutableMap.of("key", "val")
     );
@@ -424,4 +437,32 @@ public class DataSourceCompactionConfigTest
     Assert.assertEquals(config.getGranularitySpec(), fromJson.getGranularitySpec());
     Assert.assertEquals(config.getIoConfig(), fromJson.getIoConfig());
   }
+
+  @Test
+  public void testSerdeDimensionsSpec() throws IOException
+  {
+    final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
+        "dataSource",
+        null,
+        500L,
+        null,
+        new Period(3600),
+        null,
+        null,
+        new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("foo"))),
+        null,
+        ImmutableMap.of("key", "val")
+    );
+    final String json = OBJECT_MAPPER.writeValueAsString(config);
+    final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class);
+
+    Assert.assertEquals(config.getDataSource(), fromJson.getDataSource());
+    Assert.assertEquals(25, fromJson.getTaskPriority());
+    Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
+    Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
+    Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
+    Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
+    Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
+    Assert.assertEquals(config.getDimensionsSpec(), fromJson.getDimensionsSpec());
+  }
 }
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskDimensionsConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskDimensionsConfigTest.java
new file mode 100644
index 0000000..4862b45
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskDimensionsConfigTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class UserCompactionTaskDimensionsConfigTest
+{
+  @Test
+  public void testEquals()
+  {
+    EqualsVerifier.forClass(UserCompactionTaskDimensionsConfig.class)
+                  .withPrefabValues(
+                      DimensionSchema.class,
+                      new StringDimensionSchema("bar", DimensionSchema.MultiValueHandling.ofDefault(), true),
+                      new StringDimensionSchema("foo", DimensionSchema.MultiValueHandling.ofDefault(), true)
+                  )
+                  .usingGetClass()
+                  .verify();
+  }
+
+  @Test
+  public void testSerde() throws IOException
+  {
+    final UserCompactionTaskDimensionsConfig expected = new UserCompactionTaskDimensionsConfig(
+        DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))
+    );
+    final ObjectMapper mapper = new ObjectMapper();
+    final byte[] json = mapper.writeValueAsBytes(expected);
+    final UserCompactionTaskDimensionsConfig fromJson = (UserCompactionTaskDimensionsConfig) mapper.readValue(
+        json,
+        UserCompactionTaskDimensionsConfig.class
+    );
+    Assert.assertEquals(expected, fromJson);
+  }
+
+  @Test(expected = ParseException.class)
+  public void testInvalidDimensionsField()
+  {
+    final UserCompactionTaskDimensionsConfig expected = new UserCompactionTaskDimensionsConfig(
+        DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim", "dim"))
+    );
+  }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index 494f567..2da165a 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -30,6 +30,7 @@ import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.druid.client.DataSourcesSnapshot;
 import org.apache.druid.client.indexing.ClientCompactionIOConfig;
 import org.apache.druid.client.indexing.ClientCompactionIntervalSpec;
+import org.apache.druid.client.indexing.ClientCompactionTaskDimensionsSpec;
 import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
 import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
 import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
@@ -38,6 +39,7 @@ import org.apache.druid.client.indexing.HttpIndexingServiceClient;
 import org.apache.druid.client.indexing.IndexingWorker;
 import org.apache.druid.client.indexing.IndexingWorkerInfo;
 import org.apache.druid.client.indexing.TaskPayloadResponse;
+import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.discovery.DruidLeaderClient;
 import org.apache.druid.discovery.DruidNodeDiscovery;
 import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
@@ -66,6 +68,7 @@ import org.apache.druid.server.coordinator.CoordinatorStats;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig;
 import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
 import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
 import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
@@ -387,12 +390,12 @@ public class CompactSegmentsTest
         DataSegment afterNoon = createSegment(dataSourceName, j, false, k);
         if (j == 3) {
           // Make two intervals on this day compacted (two compacted intervals back-to-back)
-          beforeNoon = beforeNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of(), ImmutableMap.of()));
-          afterNoon = afterNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of(), ImmutableMap.of()));
+          beforeNoon = beforeNoon.withLastCompactionState(new CompactionState(partitionsSpec, null, ImmutableMap.of(), ImmutableMap.of()));
+          afterNoon = afterNoon.withLastCompactionState(new CompactionState(partitionsSpec, null, ImmutableMap.of(), ImmutableMap.of()));
         }
         if (j == 1) {
           // Make one interval on this day compacted
-          afterNoon = afterNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of(), ImmutableMap.of()));
+          afterNoon = afterNoon.withLastCompactionState(new CompactionState(partitionsSpec, null, ImmutableMap.of(), ImmutableMap.of()));
         }
         segments.add(beforeNoon);
         segments.add(afterNoon);
@@ -671,6 +674,7 @@ public class CompactSegmentsTest
             ),
             null,
             null,
+            null,
             null
         )
     );
@@ -685,6 +689,7 @@ public class CompactSegmentsTest
         ArgumentMatchers.any(),
         granularitySpecArgumentCaptor.capture(),
         ArgumentMatchers.any(),
+        ArgumentMatchers.any(),
         ArgumentMatchers.any()
     );
     // Only the same amount of segments as the original PARTITION_PER_TIME_INTERVAL since segment granulartity is the same
@@ -726,6 +731,7 @@ public class CompactSegmentsTest
                 null
             ),
             null,
+            null,
             new UserCompactionTaskIOConfig(true),
             null
         )
@@ -738,6 +744,7 @@ public class CompactSegmentsTest
         ArgumentMatchers.anyInt(),
         ArgumentMatchers.any(),
         ArgumentMatchers.any(),
+        ArgumentMatchers.any(),
         dropExistingCapture.capture(),
         ArgumentMatchers.any()
     );
@@ -779,6 +786,7 @@ public class CompactSegmentsTest
             ),
             null,
             null,
+            null,
             null
         )
     );
@@ -790,6 +798,7 @@ public class CompactSegmentsTest
         ArgumentMatchers.anyInt(),
         ArgumentMatchers.any(),
         ArgumentMatchers.any(),
+        ArgumentMatchers.any(),
         dropExistingCapture.capture(),
         ArgumentMatchers.any()
     );
@@ -831,6 +840,7 @@ public class CompactSegmentsTest
             ),
             new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null),
             null,
+            null,
             null
         )
     );
@@ -845,6 +855,7 @@ public class CompactSegmentsTest
         ArgumentMatchers.any(),
         granularitySpecArgumentCaptor.capture(),
         ArgumentMatchers.any(),
+        ArgumentMatchers.any(),
         ArgumentMatchers.any()
     );
     // All segments is compact at the same time since we changed the segment granularity to YEAR and all segment
@@ -857,6 +868,119 @@ public class CompactSegmentsTest
   }
 
   @Test
+  public void testCompactWithDimensionSpec()
+  {
+    final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+    final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
+    final String dataSource = DATA_SOURCE_PREFIX + 0;
+    compactionConfigs.add(
+        new DataSourceCompactionConfig(
+            dataSource,
+            0,
+            500L,
+            null,
+            new Period("PT0H"), // smaller than segment interval
+            new UserCompactionTaskQueryTuningConfig(
+                null,
+                null,
+                null,
+                null,
+                partitionsSpec,
+                null,
+                null,
+                null,
+                null,
+                null,
+                3,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null
+            ),
+            null,
+            new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
+            null,
+            null
+        )
+    );
+    doCompactSegments(compactSegments, compactionConfigs);
+    ArgumentCaptor<ClientCompactionTaskDimensionsSpec> dimensionsSpecArgumentCaptor = ArgumentCaptor.forClass(
+        ClientCompactionTaskDimensionsSpec.class);
+    Mockito.verify(mockIndexingServiceClient).compactSegments(
+        ArgumentMatchers.anyString(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.anyInt(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any(),
+        dimensionsSpecArgumentCaptor.capture(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any()
+    );
+    ClientCompactionTaskDimensionsSpec actual = dimensionsSpecArgumentCaptor.getValue();
+    Assert.assertNotNull(actual);
+    Assert.assertEquals(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo")), actual.getDimensions());
+  }
+
+  @Test
+  public void testCompactWithoutDimensionSpec()
+  {
+    final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+    final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
+    final String dataSource = DATA_SOURCE_PREFIX + 0;
+    compactionConfigs.add(
+        new DataSourceCompactionConfig(
+            dataSource,
+            0,
+            500L,
+            null,
+            new Period("PT0H"), // smaller than segment interval
+            new UserCompactionTaskQueryTuningConfig(
+                null,
+                null,
+                null,
+                null,
+                partitionsSpec,
+                null,
+                null,
+                null,
+                null,
+                null,
+                3,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null
+            ),
+            null,
+            null,
+            null,
+            null
+        )
+    );
+    doCompactSegments(compactSegments, compactionConfigs);
+    ArgumentCaptor<ClientCompactionTaskDimensionsSpec> dimensionsSpecArgumentCaptor = ArgumentCaptor.forClass(
+        ClientCompactionTaskDimensionsSpec.class);
+    Mockito.verify(mockIndexingServiceClient).compactSegments(
+        ArgumentMatchers.anyString(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.anyInt(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any(),
+        dimensionsSpecArgumentCaptor.capture(),
+        ArgumentMatchers.any(),
+        ArgumentMatchers.any()
+    );
+    ClientCompactionTaskDimensionsSpec actual = dimensionsSpecArgumentCaptor.getValue();
+    Assert.assertNull(actual);
+  }
+
+  @Test
   public void testCompactWithRollupInGranularitySpec()
   {
     final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
@@ -891,6 +1015,7 @@ public class CompactSegmentsTest
             ),
             new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, true),
             null,
+            null,
             null
         )
     );
@@ -905,6 +1030,7 @@ public class CompactSegmentsTest
         ArgumentMatchers.any(),
         granularitySpecArgumentCaptor.capture(),
         ArgumentMatchers.any(),
+        ArgumentMatchers.any(),
         ArgumentMatchers.any()
     );
     Assert.assertEquals(datasourceToSegments.get(dataSource).size(), segmentsCaptor.getValue().size());
@@ -947,6 +1073,7 @@ public class CompactSegmentsTest
             ),
             null,
             new ClientCompactionTaskGranularitySpec(Granularities.DAY, null, null),
+            null,
             null
         )
     );
@@ -983,6 +1110,7 @@ public class CompactSegmentsTest
             ),
             new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null),
             null,
+            null,
             null
         )
     );
@@ -1002,6 +1130,7 @@ public class CompactSegmentsTest
         ArgumentMatchers.any(),
         granularitySpecArgumentCaptor.capture(),
         ArgumentMatchers.any(),
+        ArgumentMatchers.any(),
         ArgumentMatchers.any()
     );
     // All segments is compact at the same time since we changed the segment granularity to YEAR and all segment
@@ -1400,6 +1529,7 @@ public class CompactSegmentsTest
               ),
               null,
               null,
+              null,
               null
           )
       );
@@ -1502,7 +1632,7 @@ public class CompactSegmentsTest
       compactSegments(
           timeline,
           segments,
-          compactionTaskQuery.getTuningConfig()
+          compactionTaskQuery
       );
       return createStringFullResponseHolder(jsonMapper.writeValueAsString(ImmutableMap.of("task", taskQuery.getId())));
     }
@@ -1515,7 +1645,7 @@ public class CompactSegmentsTest
     private void compactSegments(
         VersionedIntervalTimeline<String, DataSegment> timeline,
         List<DataSegment> segments,
-        ClientCompactionTaskQueryTuningConfig tuningConfig
+        ClientCompactionTaskQuery clientCompactionTaskQuery
     )
     {
       Preconditions.checkArgument(segments.size() > 1);
@@ -1539,13 +1669,13 @@ public class CompactSegmentsTest
       final String version = "newVersion_" + compactVersionSuffix++;
       final long segmentSize = segments.stream().mapToLong(DataSegment::getSize).sum() / 2;
       final PartitionsSpec compactionPartitionsSpec;
-      if (tuningConfig.getPartitionsSpec() instanceof DynamicPartitionsSpec) {
+      if (clientCompactionTaskQuery.getTuningConfig().getPartitionsSpec() instanceof DynamicPartitionsSpec) {
         compactionPartitionsSpec = new DynamicPartitionsSpec(
-            tuningConfig.getPartitionsSpec().getMaxRowsPerSegment(),
-            ((DynamicPartitionsSpec) tuningConfig.getPartitionsSpec()).getMaxTotalRowsOr(Long.MAX_VALUE)
+            clientCompactionTaskQuery.getTuningConfig().getPartitionsSpec().getMaxRowsPerSegment(),
+            ((DynamicPartitionsSpec) clientCompactionTaskQuery.getTuningConfig().getPartitionsSpec()).getMaxTotalRowsOr(Long.MAX_VALUE)
         );
       } else {
-        compactionPartitionsSpec = tuningConfig.getPartitionsSpec();
+        compactionPartitionsSpec = clientCompactionTaskQuery.getTuningConfig().getPartitionsSpec();
       }
 
       for (int i = 0; i < 2; i++) {
@@ -1559,6 +1689,11 @@ public class CompactSegmentsTest
             shardSpecFactory.apply(i, 2),
             new CompactionState(
                 compactionPartitionsSpec,
+                clientCompactionTaskQuery.getDimensionsSpec() == null ? null : new DimensionsSpec(
+                    clientCompactionTaskQuery.getDimensionsSpec().getDimensions(),
+                    null,
+                    null
+                ),
                 ImmutableMap.of(
                     "bitmap",
                     ImmutableMap.of("type", "roaring", "compressRunOnSerialization", true),
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
index a7d1d37..674c7f2 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
@@ -232,6 +232,7 @@ public class KillCompactionConfigTest
         null,
         new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
         null,
+        null,
         ImmutableMap.of("key", "val")
     );
 
@@ -244,6 +245,7 @@ public class KillCompactionConfigTest
         null,
         new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
         null,
+        null,
         ImmutableMap.of("key", "val")
     );
     CoordinatorCompactionConfig originalCurrentConfig = CoordinatorCompactionConfig.from(ImmutableList.of(inactiveDatasourceConfig, activeDatasourceConfig));
@@ -348,6 +350,7 @@ public class KillCompactionConfigTest
         null,
         new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
         null,
+        null,
         ImmutableMap.of("key", "val")
     );
 
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
index 092bc9f..1dc6908 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
@@ -93,6 +93,7 @@ public class NewestSegmentFirstIteratorTest
         null,
         null,
         null,
+        null,
         null
     );
     Assert.assertEquals(
@@ -133,6 +134,7 @@ public class NewestSegmentFirstIteratorTest
         ),
         null,
         null,
+        null,
         null
     );
     Assert.assertEquals(
@@ -173,6 +175,7 @@ public class NewestSegmentFirstIteratorTest
         ),
         null,
         null,
+        null,
         null
     );
     Assert.assertEquals(
@@ -213,6 +216,7 @@ public class NewestSegmentFirstIteratorTest
         ),
         null,
         null,
+        null,
         null
     );
     Assert.assertEquals(
@@ -253,6 +257,7 @@ public class NewestSegmentFirstIteratorTest
         ),
         null,
         null,
+        null,
         null
     );
     Assert.assertEquals(
@@ -293,6 +298,7 @@ public class NewestSegmentFirstIteratorTest
         ),
         null,
         null,
+        null,
         null
     );
     Assert.assertEquals(
@@ -333,6 +339,7 @@ public class NewestSegmentFirstIteratorTest
         ),
         null,
         null,
+        null,
         null
     );
     Assert.assertEquals(
@@ -373,6 +380,7 @@ public class NewestSegmentFirstIteratorTest
         ),
         null,
         null,
+        null,
         null
     );
     Assert.assertEquals(
@@ -413,6 +421,7 @@ public class NewestSegmentFirstIteratorTest
         ),
         null,
         null,
+        null,
         null
     );
     Assert.assertEquals(
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
index a202b6d..82a6573 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
@@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
+import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
@@ -37,6 +38,7 @@ import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.data.ConciseBitmapSerdeFactory;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig;
 import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
 import org.apache.druid.timeline.CompactionState;
 import org.apache.druid.timeline.DataSegment;
@@ -696,13 +698,13 @@ public class NewestSegmentFirstPolicyTest
             Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
             new Period("P1D"),
             null,
-            new CompactionState(partitionsSpec, indexSpec, null)
+            new CompactionState(partitionsSpec, null, indexSpec, null)
         ),
         new SegmentGenerateSpec(
             Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"),
             new Period("P1D"),
             null,
-            new CompactionState(partitionsSpec, indexSpec, null)
+            new CompactionState(partitionsSpec, null, indexSpec, null)
         )
     );
 
@@ -729,13 +731,13 @@ public class NewestSegmentFirstPolicyTest
             Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
             new Period("P1D"),
             null,
-            new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("segmentGranularity", "day"))
+            new CompactionState(partitionsSpec, null, indexSpec, ImmutableMap.of("segmentGranularity", "day"))
         ),
         new SegmentGenerateSpec(
             Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"),
             new Period("P1D"),
             null,
-            new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("segmentGranularity", "day"))
+            new CompactionState(partitionsSpec, null, indexSpec, ImmutableMap.of("segmentGranularity", "day"))
         )
     );
 
@@ -762,13 +764,13 @@ public class NewestSegmentFirstPolicyTest
             Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
             new Period("P1D"),
             null,
-            new CompactionState(partitionsSpec, indexSpec, null)
+            new CompactionState(partitionsSpec, null, indexSpec, null)
         ),
         new SegmentGenerateSpec(
             Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"),
             new Period("P1D"),
             null,
-            new CompactionState(partitionsSpec, indexSpec, null)
+            new CompactionState(partitionsSpec, null, indexSpec, null)
         )
     );
 
@@ -805,13 +807,13 @@ public class NewestSegmentFirstPolicyTest
             Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
             new Period("P1D"),
             null,
-            new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("segmentGranularity", "day"))
+            new CompactionState(partitionsSpec, null, indexSpec, ImmutableMap.of("segmentGranularity", "day"))
         ),
         new SegmentGenerateSpec(
             Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"),
             new Period("P1D"),
             null,
-            new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("segmentGranularity", "day"))
+            new CompactionState(partitionsSpec, null, indexSpec, ImmutableMap.of("segmentGranularity", "day"))
         )
     );
 
@@ -848,7 +850,7 @@ public class NewestSegmentFirstPolicyTest
             Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"),
             new Period("P1D"),
             null,
-            new CompactionState(partitionsSpec, indexSpec, null)
+            new CompactionState(partitionsSpec, null, indexSpec, null)
         )
     );
 
@@ -900,7 +902,7 @@ public class NewestSegmentFirstPolicyTest
             Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"),
             new Period("P1D"),
             null,
-            new CompactionState(partitionsSpec, indexSpec, null)
+            new CompactionState(partitionsSpec, null, indexSpec, null)
         )
     );
 
@@ -954,19 +956,19 @@ public class NewestSegmentFirstPolicyTest
             Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
             new Period("P1D"),
             null,
-            new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("rollup", "false"))
+            new CompactionState(partitionsSpec, null, indexSpec, ImmutableMap.of("rollup", "false"))
         ),
         new SegmentGenerateSpec(
             Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"),
             new Period("P1D"),
             null,
-            new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("rollup", "true"))
+            new CompactionState(partitionsSpec, null, indexSpec, ImmutableMap.of("rollup", "true"))
         ),
         new SegmentGenerateSpec(
             Intervals.of("2017-10-03T00:00:00/2017-10-04T00:00:00"),
             new Period("P1D"),
             null,
-            new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of())
+            new CompactionState(partitionsSpec, null, indexSpec, ImmutableMap.of())
         )
     );
 
@@ -1014,19 +1016,19 @@ public class NewestSegmentFirstPolicyTest
             Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
             new Period("P1D"),
             null,
-            new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("queryGranularity", "day"))
+            new CompactionState(partitionsSpec, null, indexSpec, ImmutableMap.of("queryGranularity", "day"))
         ),
         new SegmentGenerateSpec(
             Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"),
             new Period("P1D"),
             null,
-            new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("queryGranularity", "minute"))
+            new CompactionState(partitionsSpec, null, indexSpec, ImmutableMap.of("queryGranularity", "minute"))
         ),
         new SegmentGenerateSpec(
             Intervals.of("2017-10-03T00:00:00/2017-10-04T00:00:00"),
             new Period("P1D"),
             null,
-            new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of())
+            new CompactionState(partitionsSpec, null, indexSpec, ImmutableMap.of())
         )
     );
 
@@ -1058,6 +1060,100 @@ public class NewestSegmentFirstPolicyTest
   }
 
   @Test
+  public void testIteratorReturnsSegmentsAsSegmentsWasCompactedAndHaveDifferentDimensions()
+  {
+    // Same indexSpec as what is set in the auto compaction config
+    Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
+    // Same partitionsSpec as what is set in the auto compaction config
+    PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
+
+    // Create segments that were compacted (CompactionState != null) and have
+    // Dimensions=["foo", "bar"] for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
+    // Dimensions=["foo"] for interval 2017-10-02T00:00:00/2017-10-03T00:00:00,
+    // Dimensions=null for interval 2017-10-03T00:00:00/2017-10-04T00:00:00 (dimensions was not set during last compaction)
+    // and dimensionsSpec=null for interval 2017-10-04T00:00:00/2017-10-05T00:00:00 (dimensionsSpec was not set during last compaction)
+    final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
+        new SegmentGenerateSpec(
+            Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
+            new Period("P1D"),
+            null,
+            new CompactionState(partitionsSpec, new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo")), null, null), indexSpec, null)
+        ),
+        new SegmentGenerateSpec(
+            Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"),
+            new Period("P1D"),
+            null,
+            new CompactionState(partitionsSpec, new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("foo")), null, null), indexSpec, null)
+        ),
+        new SegmentGenerateSpec(
+            Intervals.of("2017-10-03T00:00:00/2017-10-04T00:00:00"),
+            new Period("P1D"),
+            null,
+            new CompactionState(partitionsSpec, new DimensionsSpec(null, null, null), indexSpec, null)
+        ),
+        new SegmentGenerateSpec(
+            Intervals.of("2017-10-04T00:00:00/2017-10-05T00:00:00"),
+            new Period("P1D"),
+            null,
+            new CompactionState(partitionsSpec, null, indexSpec, null)
+        )
+    );
+
+    // Auto compaction config sets Dimensions=["foo"]
+    CompactionSegmentIterator iterator = policy.reset(
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
+            130000,
+            new Period("P0D"),
+            null,
+            new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("foo")))
+        )),
+        ImmutableMap.of(DATA_SOURCE, timeline),
+        Collections.emptyMap()
+    );
+    // We should get interval 2017-10-01T00:00:00/2017-10-02T00:00:00, interval 2017-10-04T00:00:00/2017-10-05T00:00:00, and interval 2017-10-03T00:00:00/2017-10-04T00:00:00.
+    Assert.assertTrue(iterator.hasNext());
+    List<DataSegment> expectedSegmentsToCompact = new ArrayList<>(
+        timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-04T00:00:00/2017-10-05T00:00:00"), Partitions.ONLY_COMPLETE)
+    );
+    Assert.assertEquals(
+        ImmutableSet.copyOf(expectedSegmentsToCompact),
+        ImmutableSet.copyOf(iterator.next())
+    );
+    Assert.assertTrue(iterator.hasNext());
+    expectedSegmentsToCompact = new ArrayList<>(
+        timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-03T00:00:00/2017-10-04T00:00:00"), Partitions.ONLY_COMPLETE)
+    );
+    Assert.assertEquals(
+        ImmutableSet.copyOf(expectedSegmentsToCompact),
+        ImmutableSet.copyOf(iterator.next())
+    );
+    Assert.assertTrue(iterator.hasNext());
+    expectedSegmentsToCompact = new ArrayList<>(
+        timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), Partitions.ONLY_COMPLETE)
+    );
+    Assert.assertEquals(
+        ImmutableSet.copyOf(expectedSegmentsToCompact),
+        ImmutableSet.copyOf(iterator.next())
+    );
+    // No more
+    Assert.assertFalse(iterator.hasNext());
+
+    // Auto compaction config sets Dimensions=null
+    iterator = policy.reset(
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
+            130000,
+            new Period("P0D"),
+            null,
+            new UserCompactionTaskDimensionsConfig(null)
+        )),
+        ImmutableMap.of(DATA_SOURCE, timeline),
+        Collections.emptyMap()
+    );
+    // No more
+    Assert.assertFalse(iterator.hasNext());
+  }
+
+  @Test
   public void testIteratorReturnsSegmentsSmallerSegmentGranularityCoveringMultipleSegmentsInTimeline()
   {
     final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
@@ -1101,7 +1197,7 @@ public class NewestSegmentFirstPolicyTest
             Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"),
             new Period("P1D"),
             null,
-            new CompactionState(partitionsSpec, newIndexSpecMap, null)
+            new CompactionState(partitionsSpec, null, newIndexSpecMap, null)
         )
     );
 
@@ -1234,6 +1330,16 @@ public class NewestSegmentFirstPolicyTest
       UserCompactionTaskGranularityConfig granularitySpec
   )
   {
+    return createCompactionConfig(inputSegmentSizeBytes, skipOffsetFromLatest, granularitySpec, null);
+  }
+
+  private DataSourceCompactionConfig createCompactionConfig(
+      long inputSegmentSizeBytes,
+      Period skipOffsetFromLatest,
+      UserCompactionTaskGranularityConfig granularitySpec,
+      UserCompactionTaskDimensionsConfig dimensionsSpec
+  )
+  {
     return new DataSourceCompactionConfig(
         DATA_SOURCE,
         0,
@@ -1242,6 +1348,7 @@ public class NewestSegmentFirstPolicyTest
         skipOffsetFromLatest,
         null,
         granularitySpec,
+        dimensionsSpec,
         null,
         null
     );
diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
index 353550d..fc16507 100644
--- a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
@@ -57,6 +57,7 @@ public class CoordinatorCompactionConfigsResourceTest
       null,
       new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
       null,
+      null,
       ImmutableMap.of("key", "val")
   );
   private static final byte[] OLD_CONFIG_IN_BYTES = {1, 2, 3};
@@ -152,6 +153,7 @@ public class CoordinatorCompactionConfigsResourceTest
         null,
         new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, true),
         null,
+        null,
         ImmutableMap.of("key", "val")
     );
     String author = "maytas";
@@ -192,6 +194,7 @@ public class CoordinatorCompactionConfigsResourceTest
         null,
         new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
         null,
+        null,
         ImmutableMap.of("key", "val")
     );
     final CoordinatorCompactionConfig originalConfig = CoordinatorCompactionConfig.from(ImmutableList.of(toDelete));
@@ -313,6 +316,7 @@ public class CoordinatorCompactionConfigsResourceTest
         null,
         new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
         null,
+        null,
         ImmutableMap.of("key", "val")
     );
     String author = "maytas";
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index 3936855..72740a5 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -275,6 +275,7 @@ public class SystemSchemaTest extends CalciteTestBase
 
   private final CompactionState expectedCompactionState = new CompactionState(
       new DynamicPartitionsSpec(null, null),
+      null,
       Collections.singletonMap("test", "map"),
       Collections.singletonMap("test2", "map2")
   );
diff --git a/website/.spelling b/website/.spelling
index c2b9dd9..9d474f7 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1775,6 +1775,7 @@ cpuacct
 dataSourceName
 datetime
 defaultHistory
+dimensionsSpec
 doubleMax
 doubleMin
 doubleSum

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org