You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ma...@apache.org on 2022/04/08 18:02:11 UTC

[druid] branch master updated: Add a new flag for ingestion to preserve existing metrics (#12185)

This is an automated email from the ASF dual-hosted git repository.

maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 8edea5a82d Add a new flag for ingestion to preserve existing metrics (#12185)
8edea5a82d is described below

commit 8edea5a82dfdb34c3177c0efd7a79bae54751ab8
Author: Maytas Monsereenusorn <ma...@apache.org>
AuthorDate: Fri Apr 8 11:02:02 2022 -0700

    Add a new flag for ingestion to preserve existing metrics (#12185)
    
    * add impl
    
    * add impl
    
    * fix checkstyle
    
    * add impl
    
    * add unit test
    
    * fix stuff
    
    * fix stuff
    
    * fix stuff
    
    * add unit test
    
    * add more unit tests
    
    * add more unit tests
    
    * add IT
    
    * add IT
    
    * add IT
    
    * add IT
    
    * add ITs
    
    * address comments
    
    * fix test
    
    * fix test
    
    * fix test
    
    * address comments
    
    * address comments
    
    * address comments
    
    * fix conflict
    
    * fix checkstyle
    
    * address comments
    
    * fix test
    
    * fix checkstyle
    
    * fix test
    
    * fix test
    
    * fix IT
---
 .../apache/druid/data/input/InputRowSchema.java    |  27 ++
 .../druid/indexing/input/DruidInputSource.java     |  35 ++-
 .../druid/indexing/input/InputRowSchemas.java      |   6 +-
 .../task/ClientCompactionTaskQuerySerdeTest.java   |   5 +-
 .../druid/indexing/input/DruidInputSourceTest.java |  78 ++++++
 .../druid/indexing/input/InputRowSchemasTest.java  |  71 +++++
 integration-tests/pom.xml                          |   5 +
 .../apache/druid/testing/utils/CompactionUtil.java |   1 +
 .../coordinator/duty/ITAutoCompactionTest.java     | 297 ++++++++++++++++++---
 .../duty/ITAutoCompactionUpgradeTest.java          |   1 +
 .../wikipedia_index_no_rollup_preserve_metric.json |  76 ++++++
 .../wikipedia_index_rollup_preserve_metric.json    |  95 +++++++
 .../indexer/wikipedia_index_sketch_queries.json    |  49 ++++
 .../incremental/AppendableIndexBuilder.java        |  21 +-
 .../segment/incremental/IncrementalIndex.java      |  10 +
 .../incremental/OnheapIncrementalIndex.java        | 165 ++++++++++--
 .../druid/segment/data/IncrementalIndexTest.java   | 285 +++++++++++++++++++-
 .../incremental/IncrementalIndexAdapterTest.java   |   2 +-
 .../OnheapIncrementalIndexBenchmark.java           |   2 +
 .../ClientCompactionTaskQueryTuningConfig.java     |  25 +-
 .../UserCompactionTaskQueryTuningConfig.java       |   3 +
 .../DataSourceCompactionConfigTest.java            |  44 +++
 .../UserCompactionTaskQueryTuningConfigTest.java   |   3 +
 .../coordinator/duty/CompactSegmentsTest.java      |  14 +
 .../duty/NewestSegmentFirstIteratorTest.java       |   8 +
 .../duty/NewestSegmentFirstPolicyTest.java         | 109 +++++++-
 26 files changed, 1358 insertions(+), 79 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java b/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java
index 227bd3a6d1..3c4263ba99 100644
--- a/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java
+++ b/core/src/main/java/org/apache/druid/data/input/InputRowSchema.java
@@ -19,9 +19,13 @@
 
 package org.apache.druid.data.input;
 
+import com.google.common.collect.ImmutableSet;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.TimestampSpec;
 
+import javax.validation.constraints.NotNull;
+import java.util.Set;
+
 /**
  * Schema of {@link InputRow}.
  */
@@ -30,16 +34,39 @@ public class InputRowSchema
   private final TimestampSpec timestampSpec;
   private final DimensionsSpec dimensionsSpec;
   private final ColumnsFilter columnsFilter;
+  /**
+   * Set of metric names for further downstream processing by {@link InputSource}.
+   * Empty set if no metric given.
+   */
+  @NotNull
+  private final Set<String> metricNames;
 
   public InputRowSchema(
       final TimestampSpec timestampSpec,
       final DimensionsSpec dimensionsSpec,
       final ColumnsFilter columnsFilter
   )
+  {
+    this(timestampSpec, dimensionsSpec, columnsFilter, ImmutableSet.of());
+  }
+
+  public InputRowSchema(
+      final TimestampSpec timestampSpec,
+      final DimensionsSpec dimensionsSpec,
+      final ColumnsFilter columnsFilter,
+      final Set<String> metricNames
+  )
   {
     this.timestampSpec = timestampSpec;
     this.dimensionsSpec = dimensionsSpec;
     this.columnsFilter = columnsFilter;
+    this.metricNames = metricNames == null ? ImmutableSet.of() : metricNames;
+  }
+
+  @NotNull
+  public Set<String> getMetricNames()
+  {
+    return metricNames;
   }
 
   public TimestampSpec getTimestampSpec()
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
index ff1dd617f1..6f1a6d6446 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
@@ -23,12 +23,14 @@ import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
 import org.apache.druid.client.coordinator.CoordinatorClient;
 import org.apache.druid.data.input.AbstractInputSource;
+import org.apache.druid.data.input.ColumnsFilter;
 import org.apache.druid.data.input.InputFileAttribute;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputRowSchema;
@@ -241,8 +243,26 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
 
     final DruidSegmentInputFormat inputFormat = new DruidSegmentInputFormat(indexIO, dimFilter);
 
+    return new InputEntityIteratingReader(
+        getInputRowSchemaToUse(inputRowSchema),
+        inputFormat,
+        entityIterator,
+        temporaryDirectory
+    );
+  }
+
+  @VisibleForTesting
+  InputRowSchema getInputRowSchemaToUse(InputRowSchema inputRowSchema)
+  {
     final InputRowSchema inputRowSchemaToUse;
 
+    ColumnsFilter columnsFilterToUse = inputRowSchema.getColumnsFilter();
+    if (inputRowSchema.getMetricNames() != null) {
+      for (String metricName : inputRowSchema.getMetricNames()) {
+        columnsFilterToUse = columnsFilterToUse.plus(metricName);
+      }
+    }
+
     if (taskConfig.isIgnoreTimestampSpecForDruidInputSource()) {
       // Legacy compatibility mode; see https://github.com/apache/druid/pull/10267.
       LOG.warn("Ignoring the provided timestampSpec and reading the __time column instead. To use timestampSpecs with "
@@ -251,10 +271,14 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
       inputRowSchemaToUse = new InputRowSchema(
           new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, STANDARD_TIME_COLUMN_FORMATS.iterator().next(), null),
           inputRowSchema.getDimensionsSpec(),
-          inputRowSchema.getColumnsFilter().plus(ColumnHolder.TIME_COLUMN_NAME)
+          columnsFilterToUse.plus(ColumnHolder.TIME_COLUMN_NAME)
       );
     } else {
-      inputRowSchemaToUse = inputRowSchema;
+      inputRowSchemaToUse = new InputRowSchema(
+          inputRowSchema.getTimestampSpec(),
+          inputRowSchema.getDimensionsSpec(),
+          columnsFilterToUse
+      );
     }
 
     if (ColumnHolder.TIME_COLUMN_NAME.equals(inputRowSchemaToUse.getTimestampSpec().getTimestampColumn())
@@ -268,12 +292,7 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
       );
     }
 
-    return new InputEntityIteratingReader(
-        inputRowSchemaToUse,
-        inputFormat,
-        entityIterator,
-        temporaryDirectory
-    );
+    return inputRowSchemaToUse;
   }
 
   private List<TimelineObjectHolder<String, DataSegment>> createTimeline()
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/InputRowSchemas.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/InputRowSchemas.java
index f273be7922..c895eb14b7 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/input/InputRowSchemas.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/InputRowSchemas.java
@@ -29,6 +29,7 @@ import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.transform.Transform;
 import org.apache.druid.segment.transform.TransformSpec;
 
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -56,7 +57,10 @@ public class InputRowSchemas
             dataSchema.getDimensionsSpec(),
             dataSchema.getTransformSpec(),
             dataSchema.getAggregators()
-        )
+        ),
+        Arrays.stream(dataSchema.getAggregators())
+              .map(AggregatorFactory::getName)
+              .collect(Collectors.toSet())
     );
   }
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
index d2ed8b31e5..29952ebc1c 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
@@ -57,6 +57,7 @@ import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.data.BitmapSerde.DefaultBitmapSerdeFactory;
 import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
 import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
@@ -93,6 +94,7 @@ public class ClientCompactionTaskQuerySerdeTest
             true
         ),
         new ClientCompactionTaskQueryTuningConfig(
+            null,
             null,
             40000,
             2000L,
@@ -249,7 +251,7 @@ public class ClientCompactionTaskQuerySerdeTest
             new ParallelIndexTuningConfig(
                 null,
                 null,
-                null,
+                new OnheapIncrementalIndex.Spec(true),
                 40000,
                 2000L,
                 null,
@@ -313,6 +315,7 @@ public class ClientCompactionTaskQuerySerdeTest
         ),
         new ClientCompactionTaskQueryTuningConfig(
             100,
+            new OnheapIncrementalIndex.Spec(true),
             40000,
             2000L,
             30000L,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java
index ebc2b94f32..2989415a5e 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidInputSourceTest.java
@@ -23,8 +23,13 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.InjectableValues;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.data.input.ColumnsFilter;
+import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.guice.IndexingServiceInputSourceModule;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
 import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
@@ -35,12 +40,15 @@ import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.TestHelper;
 import org.easymock.EasyMock;
 import org.hamcrest.CoreMatchers;
+import org.joda.time.Interval;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import java.util.Arrays;
+
 public class DruidInputSourceTest
 {
   private final IndexIO indexIO = EasyMock.createMock(IndexIO.class);
@@ -221,4 +229,74 @@ public class DruidInputSourceTest
 
     mapper.readValue(json, InputSource.class);
   }
+
+  @Test
+  public void testReaderColumnsFilterWithMetricGiven()
+  {
+    String datasource = "foo";
+    Interval interval = Intervals.of("2000/2001");
+    String column = "c1";
+    String metricName = "m1";
+    ColumnsFilter originalColumnsFilter = ColumnsFilter.inclusionBased(ImmutableSet.of(column));
+    InputRowSchema inputRowSchema = new InputRowSchema(
+        new TimestampSpec("timestamp", "auto", null),
+        new DimensionsSpec(
+            DimensionsSpec.getDefaultSchemas(Arrays.asList("timestamp", "a", "b"))
+        ),
+        originalColumnsFilter,
+        ImmutableSet.of(metricName)
+    );
+    DruidInputSource druidInputSource = new DruidInputSource(
+        datasource,
+        interval,
+        null,
+        null,
+        ImmutableList.of("a"),
+        ImmutableList.of("b"),
+        indexIO,
+        coordinatorClient,
+        segmentCacheManagerFactory,
+        retryPolicyFactory,
+        taskConfig
+    );
+    InputRowSchema inputSourceReader = druidInputSource.getInputRowSchemaToUse(inputRowSchema);
+    ColumnsFilter columnsFilter = inputSourceReader.getColumnsFilter();
+    Assert.assertTrue(columnsFilter.apply(column));
+    Assert.assertTrue(columnsFilter.apply(metricName));
+  }
+
+  @Test
+  public void testReaderColumnsFilterWithNoMetricGiven()
+  {
+    String datasource = "foo";
+    Interval interval = Intervals.of("2000/2001");
+    String column = "c1";
+    String metricName = "m1";
+    ColumnsFilter originalColumnsFilter = ColumnsFilter.inclusionBased(ImmutableSet.of(column));
+    InputRowSchema inputRowSchema = new InputRowSchema(
+        new TimestampSpec("timestamp", "auto", null),
+        new DimensionsSpec(
+            DimensionsSpec.getDefaultSchemas(Arrays.asList("timestamp", "a", "b"))
+        ),
+        originalColumnsFilter,
+        ImmutableSet.of()
+    );
+    DruidInputSource druidInputSource = new DruidInputSource(
+        datasource,
+        interval,
+        null,
+        null,
+        ImmutableList.of("a"),
+        ImmutableList.of("b"),
+        indexIO,
+        coordinatorClient,
+        segmentCacheManagerFactory,
+        retryPolicyFactory,
+        taskConfig
+    );
+    InputRowSchema inputSourceReader = druidInputSource.getInputRowSchemaToUse(inputRowSchema);
+    ColumnsFilter columnsFilter = inputSourceReader.getColumnsFilter();
+    Assert.assertTrue(columnsFilter.apply(column));
+    Assert.assertFalse(columnsFilter.apply(metricName));
+  }
 }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/input/InputRowSchemasTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/input/InputRowSchemasTest.java
index ae6ed8816d..991a5950f9 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/input/InputRowSchemasTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/input/InputRowSchemasTest.java
@@ -23,18 +23,28 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import org.apache.druid.common.config.NullHandlingTest;
 import org.apache.druid.data.input.ColumnsFilter;
+import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.DoubleDimensionSchema;
+import org.apache.druid.data.input.impl.FloatDimensionSchema;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.query.filter.SelectorDimFilter;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.transform.ExpressionTransform;
 import org.apache.druid.segment.transform.TransformSpec;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Arrays;
+
 public class InputRowSchemasTest extends NullHandlingTest
 {
   @Test
@@ -98,4 +108,65 @@ public class InputRowSchemasTest extends NullHandlingTest
         columnsFilter
     );
   }
+
+  @Test
+  public void testFromDataSchema()
+  {
+    TimestampSpec timestampSpec = new TimestampSpec(null, null, null);
+    DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        Arrays.asList(
+            new StringDimensionSchema("d1"),
+            new StringDimensionSchema("d2"),
+            new LongDimensionSchema("d3"),
+            new FloatDimensionSchema("d4"),
+            new DoubleDimensionSchema("d5")
+        )
+    );
+    DataSchema schema = new DataSchema(
+        "dataSourceName",
+        new TimestampSpec(null, null, null),
+        dimensionsSpec,
+        new AggregatorFactory[]{
+            new CountAggregatorFactory("count"),
+            new LongSumAggregatorFactory("met", "met")
+        },
+        new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null),
+        null
+    );
+
+    InputRowSchema inputRowSchema = InputRowSchemas.fromDataSchema(schema);
+    Assert.assertEquals(timestampSpec, inputRowSchema.getTimestampSpec());
+    Assert.assertEquals(dimensionsSpec.getDimensions(), inputRowSchema.getDimensionsSpec().getDimensions());
+    Assert.assertEquals(dimensionsSpec.getDimensionNames(), inputRowSchema.getDimensionsSpec().getDimensionNames());
+    Assert.assertEquals(ImmutableSet.of("count", "met"), inputRowSchema.getMetricNames());
+  }
+
+  @Test
+  public void testFromDataSchemaWithNoAggregator()
+  {
+    TimestampSpec timestampSpec = new TimestampSpec(null, null, null);
+    DimensionsSpec dimensionsSpec = new DimensionsSpec(
+        Arrays.asList(
+            new StringDimensionSchema("d1"),
+            new StringDimensionSchema("d2"),
+            new LongDimensionSchema("d3"),
+            new FloatDimensionSchema("d4"),
+            new DoubleDimensionSchema("d5")
+        )
+    );
+    DataSchema schema = new DataSchema(
+        "dataSourceName",
+        new TimestampSpec(null, null, null),
+        dimensionsSpec,
+        new AggregatorFactory[]{},
+        new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null),
+        null
+    );
+
+    InputRowSchema inputRowSchema = InputRowSchemas.fromDataSchema(schema);
+    Assert.assertEquals(timestampSpec, inputRowSchema.getTimestampSpec());
+    Assert.assertEquals(dimensionsSpec.getDimensions(), inputRowSchema.getDimensionsSpec().getDimensions());
+    Assert.assertEquals(dimensionsSpec.getDimensionNames(), inputRowSchema.getDimensionsSpec().getDimensionNames());
+    Assert.assertEquals(ImmutableSet.of(), inputRowSchema.getMetricNames());
+  }
 }
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index fb378b766e..71e6add527 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -432,6 +432,11 @@
             <version>${aws.sdk.version}</version>
             <scope>runtime</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.datasketches</groupId>
+            <artifactId>datasketches-java</artifactId>
+            <scope>runtime</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java
index 463c4b2c0b..accaec0146 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java
@@ -53,6 +53,7 @@ public class CompactionUtil
             null,
             null,
             null,
+            null,
             new MaxSizeSplitHintSpec(null, 1),
             new DynamicPartitionsSpec(maxRowsPerSegment, null),
             null,
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
index 21e34ca8da..83e0ac9109 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.inject.Inject;
 import org.apache.commons.io.IOUtils;
+import org.apache.datasketches.hll.TgtHllType;
 import org.apache.druid.data.input.MaxSizeSplitHintSpec;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.indexer.TaskState;
@@ -41,8 +42,12 @@ import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildAggregatorFactory;
+import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory;
+import org.apache.druid.query.aggregation.datasketches.theta.SketchMergeAggregatorFactory;
 import org.apache.druid.query.filter.SelectorDimFilter;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
@@ -90,7 +95,10 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
   private static final String INDEX_TASK_WITH_GRANULARITY_SPEC = "/indexer/wikipedia_index_task_with_granularity_spec.json";
   private static final String INDEX_TASK_WITH_DIMENSION_SPEC = "/indexer/wikipedia_index_task_with_dimension_spec.json";
   private static final String INDEX_ROLLUP_QUERIES_RESOURCE = "/indexer/wikipedia_index_rollup_queries.json";
+  private static final String INDEX_ROLLUP_SKETCH_QUERIES_RESOURCE = "/indexer/wikipedia_index_sketch_queries.json";
   private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
+  private static final String INDEX_TASK_WITH_ROLLUP_FOR_PRESERVE_METRICS = "/indexer/wikipedia_index_rollup_preserve_metric.json";
+  private static final String INDEX_TASK_WITHOUT_ROLLUP_FOR_PRESERVE_METRICS = "/indexer/wikipedia_index_no_rollup_preserve_metric.json";
   private static final int MAX_ROWS_PER_SEGMENT_COMPACTED = 10000;
   private static final Period NO_SKIP_OFFSET = Period.seconds(0);
 
@@ -110,6 +118,226 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
     fullDatasourceName = "wikipedia_index_test_" + UUID.randomUUID() + config.getExtraDatasourceNameSuffix();
   }
 
+  @Test
+  public void testAutoCompactionRowWithMetricAndRowWithoutMetricShouldPreserveExistingMetrics() throws Exception
+  {
+    // added = null, count = 2, sum_added = 62, quantilesDoublesSketch = 2, thetaSketch = 2, HLLSketchBuild = 2
+    loadData(INDEX_TASK_WITH_ROLLUP_FOR_PRESERVE_METRICS);
+    // added = 31, count = null, sum_added = null, quantilesDoublesSketch = null, thetaSketch = null, HLLSketchBuild = null
+    loadData(INDEX_TASK_WITHOUT_ROLLUP_FOR_PRESERVE_METRICS);
+    try (final Closeable ignored = unloader(fullDatasourceName)) {
+      final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
+      intervalsBeforeCompaction.sort(null);
+      // 2 segments across 1 days...
+      verifySegmentsCount(2);
+      ArrayList<Object> nullList = new ArrayList<Object>();
+      nullList.add(null);
+      Map<String, Object> queryAndResultFields = ImmutableMap.of(
+          "%%FIELD_TO_QUERY%%", "added",
+          "%%EXPECTED_COUNT_RESULT%%", 2,
+          "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(nullList)), ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(31))))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+      queryAndResultFields = ImmutableMap.of(
+          "%%FIELD_TO_QUERY%%", "count",
+          "%%EXPECTED_COUNT_RESULT%%", 2,
+          "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(2))), ImmutableMap.of("events", ImmutableList.of(nullList)))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+      queryAndResultFields = ImmutableMap.of(
+          "%%FIELD_TO_QUERY%%", "sum_added",
+          "%%EXPECTED_COUNT_RESULT%%", 2,
+          "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(62))), ImmutableMap.of("events", ImmutableList.of(nullList)))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+      queryAndResultFields = ImmutableMap.of(
+          "%%QUANTILESRESULT%%", 2,
+          "%%THETARESULT%%", 2.0,
+          "%%HLLRESULT%%", 2
+      );
+      verifyQuery(INDEX_ROLLUP_SKETCH_QUERIES_RESOURCE, queryAndResultFields);
+
+      submitCompactionConfig(
+          MAX_ROWS_PER_SEGMENT_COMPACTED,
+          NO_SKIP_OFFSET,
+          new UserCompactionTaskGranularityConfig(null, null, true),
+          new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("language"))),
+          null,
+          new AggregatorFactory[]{
+              new CountAggregatorFactory("count"),
+              new LongSumAggregatorFactory("sum_added", "added"),
+              new SketchMergeAggregatorFactory("thetaSketch", "user", 16384, true, false, null),
+              new HllSketchBuildAggregatorFactory("HLLSketchBuild", "user", 12, TgtHllType.HLL_4.name(), false),
+              new DoublesSketchAggregatorFactory("quantilesDoublesSketch", "delta", 128, 1000000000L)
+          },
+          false
+      );
+      // should now only have 1 row after compaction
+      // added = null, count = 3, sum_added = 93
+      forceTriggerAutoCompaction(1);
+
+      queryAndResultFields = ImmutableMap.of(
+          "%%FIELD_TO_QUERY%%", "added",
+          "%%EXPECTED_COUNT_RESULT%%", 1,
+          "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(nullList)))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+      queryAndResultFields = ImmutableMap.of(
+          "%%FIELD_TO_QUERY%%", "count",
+          "%%EXPECTED_COUNT_RESULT%%", 1,
+          "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(3))))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+      queryAndResultFields = ImmutableMap.of(
+          "%%FIELD_TO_QUERY%%", "sum_added",
+          "%%EXPECTED_COUNT_RESULT%%", 1,
+          "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(93))))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+      queryAndResultFields = ImmutableMap.of(
+          "%%QUANTILESRESULT%%", 3,
+          "%%THETARESULT%%", 3.0,
+          "%%HLLRESULT%%", 3
+      );
+      verifyQuery(INDEX_ROLLUP_SKETCH_QUERIES_RESOURCE, queryAndResultFields);
+
+      verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
+      checkCompactionIntervals(intervalsBeforeCompaction);
+
+      List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
+      // Verify rollup segments does not get compacted again
+      forceTriggerAutoCompaction(1);
+      List<TaskResponseObject> compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName);
+      Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size());
+    }
+  }
+
+  @Test
+  public void testAutoCompactionOnlyRowsWithoutMetricShouldAddNewMetrics() throws Exception
+  {
+    // added = 31, count = null, sum_added = null
+    loadData(INDEX_TASK_WITHOUT_ROLLUP_FOR_PRESERVE_METRICS);
+    // added = 31, count = null, sum_added = null
+    loadData(INDEX_TASK_WITHOUT_ROLLUP_FOR_PRESERVE_METRICS);
+    try (final Closeable ignored = unloader(fullDatasourceName)) {
+      final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
+      intervalsBeforeCompaction.sort(null);
+      // 2 segments across 1 days...
+      verifySegmentsCount(2);
+      ArrayList<Object> nullList = new ArrayList<Object>();
+      nullList.add(null);
+      Map<String, Object> queryAndResultFields = ImmutableMap.of(
+          "%%FIELD_TO_QUERY%%", "added",
+          "%%EXPECTED_COUNT_RESULT%%", 2,
+          "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(31))), ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(31))))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+
+      submitCompactionConfig(
+          MAX_ROWS_PER_SEGMENT_COMPACTED,
+          NO_SKIP_OFFSET,
+          new UserCompactionTaskGranularityConfig(null, null, true),
+          new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("language"))),
+          null,
+          new AggregatorFactory[] {new CountAggregatorFactory("count"), new LongSumAggregatorFactory("sum_added", "added")},
+          false
+      );
+      // should now only have 1 row after compaction
+      // added = null, count = 2, sum_added = 62
+      forceTriggerAutoCompaction(1);
+
+      queryAndResultFields = ImmutableMap.of(
+          "%%FIELD_TO_QUERY%%", "added",
+          "%%EXPECTED_COUNT_RESULT%%", 1,
+          "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(nullList)))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+      queryAndResultFields = ImmutableMap.of(
+          "%%FIELD_TO_QUERY%%", "count",
+          "%%EXPECTED_COUNT_RESULT%%", 1,
+          "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(2))))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+      queryAndResultFields = ImmutableMap.of(
+          "%%FIELD_TO_QUERY%%", "sum_added",
+          "%%EXPECTED_COUNT_RESULT%%", 1,
+          "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(62))))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+
+      verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
+      checkCompactionIntervals(intervalsBeforeCompaction);
+
+      List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
+      // Verify rollup segments does not get compacted again
+      forceTriggerAutoCompaction(1);
+      List<TaskResponseObject> compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName);
+      Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size());
+    }
+  }
+
+  @Test
+  public void testAutoCompactionOnlyRowsWithMetricShouldPreserveExistingMetrics() throws Exception
+  {
+    // added = null, count = 2, sum_added = 62
+    loadData(INDEX_TASK_WITH_ROLLUP_FOR_PRESERVE_METRICS);
+    // added = null, count = 2, sum_added = 62
+    loadData(INDEX_TASK_WITH_ROLLUP_FOR_PRESERVE_METRICS);
+    try (final Closeable ignored = unloader(fullDatasourceName)) {
+      final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
+      intervalsBeforeCompaction.sort(null);
+      // 2 segments across 1 days...
+      verifySegmentsCount(2);
+      Map<String, Object> queryAndResultFields = ImmutableMap.of(
+          "%%FIELD_TO_QUERY%%", "count",
+          "%%EXPECTED_COUNT_RESULT%%", 2,
+          "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(2))), ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(2))))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+      queryAndResultFields = ImmutableMap.of(
+          "%%FIELD_TO_QUERY%%", "sum_added",
+          "%%EXPECTED_COUNT_RESULT%%", 2,
+          "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(62))), ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(62))))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+
+      submitCompactionConfig(
+          MAX_ROWS_PER_SEGMENT_COMPACTED,
+          NO_SKIP_OFFSET,
+          new UserCompactionTaskGranularityConfig(null, null, true),
+          new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("language"))),
+          null,
+          new AggregatorFactory[] {new CountAggregatorFactory("count"), new LongSumAggregatorFactory("sum_added", "added")},
+          false
+      );
+      // should now only have 1 row after compaction
+      // added = null, count = 4, sum_added = 124
+      forceTriggerAutoCompaction(1);
+
+      queryAndResultFields = ImmutableMap.of(
+          "%%FIELD_TO_QUERY%%", "count",
+          "%%EXPECTED_COUNT_RESULT%%", 1,
+          "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(4))))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+      queryAndResultFields = ImmutableMap.of(
+          "%%FIELD_TO_QUERY%%", "sum_added",
+          "%%EXPECTED_COUNT_RESULT%%", 1,
+          "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(124))))
+      );
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
+
+      verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
+      checkCompactionIntervals(intervalsBeforeCompaction);
+
+      List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
+      // Verify rollup segments does not get compacted again
+      forceTriggerAutoCompaction(1);
+      List<TaskResponseObject> compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName);
+      Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size());
+    }
+  }
+
   @Test
   public void testAutoCompactionDutySubmitAndVerifyCompaction() throws Exception
   {
@@ -646,12 +874,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
     Map<String, Object> specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.MONTH, Granularities.DAY, false, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono))));
     loadData(INDEX_TASK_WITH_GRANULARITY_SPEC, specs);
     try (final Closeable ignored = unloader(fullDatasourceName)) {
-      Map<String, Object> expectedResult = ImmutableMap.of(
+      Map<String, Object> queryAndResultFields = ImmutableMap.of(
           "%%FIELD_TO_QUERY%%", "added",
           "%%EXPECTED_COUNT_RESULT%%", 2,
           "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
       );
-      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
       submitCompactionConfig(
           MAX_ROWS_PER_SEGMENT_COMPACTED,
           NO_SKIP_OFFSET,
@@ -667,12 +895,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
       // does not have data on every week on the month
       forceTriggerAutoCompaction(3);
       // Make sure that no data is lost after compaction
-      expectedResult = ImmutableMap.of(
+      queryAndResultFields = ImmutableMap.of(
           "%%FIELD_TO_QUERY%%", "added",
           "%%EXPECTED_COUNT_RESULT%%", 2,
           "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
       );
-      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
       verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
       List<TaskResponseObject> tasks = indexer.getCompleteTasksForDataSource(fullDatasourceName);
       TaskResponseObject compactTask = null;
@@ -696,12 +924,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
     Map<String, Object> specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.WEEK, Granularities.DAY, false, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono))));
     loadData(INDEX_TASK_WITH_GRANULARITY_SPEC, specs);
     try (final Closeable ignored = unloader(fullDatasourceName)) {
-      Map<String, Object> expectedResult = ImmutableMap.of(
+      Map<String, Object> queryAndResultFields = ImmutableMap.of(
           "%%FIELD_TO_QUERY%%", "added",
           "%%EXPECTED_COUNT_RESULT%%", 2,
           "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
       );
-      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
       submitCompactionConfig(
           MAX_ROWS_PER_SEGMENT_COMPACTED,
           NO_SKIP_OFFSET,
@@ -714,12 +942,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
       // we expect the compaction task's interval to align with the MONTH segmentGranularity (2013-08-01 to 2013-10-01)
       forceTriggerAutoCompaction(2);
       // Make sure that no data is lost after compaction
-      expectedResult = ImmutableMap.of(
+      queryAndResultFields = ImmutableMap.of(
           "%%FIELD_TO_QUERY%%", "added",
           "%%EXPECTED_COUNT_RESULT%%", 2,
           "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
       );
-      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
       verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
       List<TaskResponseObject> tasks = indexer.getCompleteTasksForDataSource(fullDatasourceName);
       TaskResponseObject compactTask = null;
@@ -742,12 +970,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
     Map<String, Object> specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.DAY, Granularities.DAY, false, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono))));
     loadData(INDEX_TASK_WITH_GRANULARITY_SPEC, specs);
     try (final Closeable ignored = unloader(fullDatasourceName)) {
-      Map<String, Object> expectedResult = ImmutableMap.of(
+      Map<String, Object> queryAndResultFields = ImmutableMap.of(
           "%%FIELD_TO_QUERY%%", "added",
           "%%EXPECTED_COUNT_RESULT%%", 2,
           "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
       );
-      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
       submitCompactionConfig(
           MAX_ROWS_PER_SEGMENT_COMPACTED,
           NO_SKIP_OFFSET,
@@ -755,12 +983,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
           false
       );
       forceTriggerAutoCompaction(2);
-      expectedResult = ImmutableMap.of(
+      queryAndResultFields = ImmutableMap.of(
           "%%FIELD_TO_QUERY%%", "added",
           "%%EXPECTED_COUNT_RESULT%%", 1,
           "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(516.0))))
       );
-      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
       verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
 
       List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
@@ -778,12 +1006,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
     Map<String, Object> specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, true, ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono))));
     loadData(INDEX_TASK_WITH_GRANULARITY_SPEC, specs);
     try (final Closeable ignored = unloader(fullDatasourceName)) {
-      Map<String, Object> expectedResult = ImmutableMap.of(
+      Map<String, Object> queryAndResultFields = ImmutableMap.of(
           "%%FIELD_TO_QUERY%%", "added",
           "%%EXPECTED_COUNT_RESULT%%", 2,
           "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
       );
-      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
       submitCompactionConfig(
           MAX_ROWS_PER_SEGMENT_COMPACTED,
           NO_SKIP_OFFSET,
@@ -791,12 +1019,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
           false
       );
       forceTriggerAutoCompaction(2);
-      expectedResult = ImmutableMap.of(
+      queryAndResultFields = ImmutableMap.of(
           "%%FIELD_TO_QUERY%%", "added",
           "%%EXPECTED_COUNT_RESULT%%", 1,
           "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(516.0))))
       );
-      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
       verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
 
       List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
@@ -820,12 +1048,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
       verifySegmentsCount(4);
 
       // Result is not rollup
-      Map<String, Object> expectedResult = ImmutableMap.of(
+      Map<String, Object> queryAndResultFields = ImmutableMap.of(
           "%%FIELD_TO_QUERY%%", "added",
           "%%EXPECTED_COUNT_RESULT%%", 2,
           "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
       );
-      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
 
       // Compact and change dimension to only "language"
       submitCompactionConfig(
@@ -840,12 +1068,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
       forceTriggerAutoCompaction(2);
 
       // Result should rollup on language dimension
-      expectedResult = ImmutableMap.of(
+      queryAndResultFields = ImmutableMap.of(
           "%%FIELD_TO_QUERY%%", "added",
           "%%EXPECTED_COUNT_RESULT%%", 1,
           "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(516.0))))
       );
-      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
       verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
 
       List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
@@ -868,12 +1096,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
 
       // Result is not rollup
       // For dim "page", result has values "Gypsy Danger" and "Striker Eureka"
-      Map<String, Object> expectedResult = ImmutableMap.of(
+      Map<String, Object> queryAndResultFields = ImmutableMap.of(
           "%%FIELD_TO_QUERY%%", "added",
           "%%EXPECTED_COUNT_RESULT%%", 2,
           "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
       );
-      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
 
       // Compact and filter with selector on dim "page" and value "Striker Eureka"
       submitCompactionConfig(
@@ -888,12 +1116,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
       forceTriggerAutoCompaction(2);
 
       // For dim "page", result should only contain value "Striker Eureka"
-      expectedResult = ImmutableMap.of(
+      queryAndResultFields = ImmutableMap.of(
           "%%FIELD_TO_QUERY%%", "added",
           "%%EXPECTED_COUNT_RESULT%%", 1,
           "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(459.0))))
       );
-      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
       verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
 
       List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
@@ -915,12 +1143,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
       verifySegmentsCount(4);
 
       // For dim "page", result has values "Gypsy Danger" and "Striker Eureka"
-      Map<String, Object> expectedResult = ImmutableMap.of(
+      Map<String, Object> queryAndResultFields = ImmutableMap.of(
           "%%FIELD_TO_QUERY%%", "added",
           "%%EXPECTED_COUNT_RESULT%%", 2,
           "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
       );
-      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
 
       // Compact and add longSum and doubleSum metrics
       submitCompactionConfig(
@@ -936,19 +1164,19 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
 
       // Result should be the same with the addition of new metrics, "double_sum_added" and "long_sum_added".
       // These new metrics should have the same value as the input field "added"
-      expectedResult = ImmutableMap.of(
+      queryAndResultFields = ImmutableMap.of(
           "%%FIELD_TO_QUERY%%", "double_sum_added",
           "%%EXPECTED_COUNT_RESULT%%", 2,
           "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
       );
-      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
 
-      expectedResult = ImmutableMap.of(
+      queryAndResultFields = ImmutableMap.of(
           "%%FIELD_TO_QUERY%%", "long_sum_added",
           "%%EXPECTED_COUNT_RESULT%%", 2,
           "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57), ImmutableList.of(459))))
       );
-      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
 
       verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
 
@@ -976,12 +1204,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
 
       // Result is not rollup
       // For dim "page", result has values "Gypsy Danger" and "Striker Eureka"
-      Map<String, Object> expectedResult = ImmutableMap.of(
+      Map<String, Object> queryAndResultFields = ImmutableMap.of(
           "%%FIELD_TO_QUERY%%", "added",
           "%%EXPECTED_COUNT_RESULT%%", 2,
           "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
       );
-      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
 
       submitCompactionConfig(
           MAX_ROWS_PER_SEGMENT_COMPACTED,
@@ -994,11 +1222,11 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
       );
       // Compact the MONTH segment
       forceTriggerAutoCompaction(2);
-      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
 
       // Compact the WEEK segment
       forceTriggerAutoCompaction(2);
-      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+      verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
 
       // Verify all task succeed
       List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
@@ -1133,6 +1361,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
             null,
             null,
             null,
+            null,
             new MaxSizeSplitHintSpec(null, 1),
             partitionsSpec,
             null,
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java
index 7b7eefac6c..7c5afb1ffd 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java
@@ -81,6 +81,7 @@ public class ITAutoCompactionUpgradeTest extends AbstractIndexerTest
             null,
             null,
             null,
+            null,
             new MaxSizeSplitHintSpec(null, 1),
             newPartitionsSpec,
             null,
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_no_rollup_preserve_metric.json b/integration-tests/src/test/resources/indexer/wikipedia_index_no_rollup_preserve_metric.json
new file mode 100644
index 0000000000..75e6b663a9
--- /dev/null
+++ b/integration-tests/src/test/resources/indexer/wikipedia_index_no_rollup_preserve_metric.json
@@ -0,0 +1,76 @@
+{
+  "type": "index_parallel",
+  "spec": {
+    "ioConfig": {
+      "type": "index_parallel",
+      "inputSource": {
+        "type": "inline",
+        "data": "{\"isRobot\":true,\"language\":\"en\",\"timestamp\":\"2013-08-31T00:00:11.080Z\",\"flags\":\"NB\",\"isUnpatrolled\":false,\"page\":\"Salo Toraut\",\"diffUrl\":\"https://sv.wikipedia.org/w/index.php?oldid=36099284&rcid=89369918\",\"added\":31,\"comment\":\"Botskapande Indonesien omdirigering\",\"commentLength\":35,\"isNew\":true,\"isMinor\":false,\"delta\":31,\"isAnonymous\":false,\"user\":\"maytas3\",\"deltaBucket\":0.0,\"deleted\":0,\"namespace\":\"Main\"}\n"
+      },
+      "inputFormat": {
+        "type": "json"
+      },
+      "appendToExisting": true
+    },
+    "tuningConfig": {
+      "type": "index_parallel",
+      "partitionsSpec": {
+        "type": "dynamic"
+      }
+    },
+    "dataSchema": {
+      "dataSource": "%%DATASOURCE%%",
+      "timestampSpec": {
+        "column": "timestamp",
+        "format": "iso"
+      },
+      "dimensionsSpec": {
+        "dimensions": [
+          "isRobot",
+          "language",
+          "flags",
+          "isUnpatrolled",
+          "page",
+          "diffUrl",
+          {
+            "type": "long",
+            "name": "added"
+          },
+          "comment",
+          {
+            "type": "long",
+            "name": "commentLength"
+          },
+          "isNew",
+          "isMinor",
+          {
+            "type": "long",
+            "name": "delta"
+          },
+          "isAnonymous",
+          "user",
+          {
+            "type": "long",
+            "name": "deltaBucket"
+          },
+          {
+            "type": "long",
+            "name": "deleted"
+          },
+          "namespace",
+          "cityName",
+          "countryName",
+          "regionIsoCode",
+          "metroCode",
+          "countryIsoCode",
+          "regionName"
+        ]
+      },
+      "granularitySpec": {
+        "queryGranularity": "hour",
+        "rollup": false,
+        "segmentGranularity": "day"
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_rollup_preserve_metric.json b/integration-tests/src/test/resources/indexer/wikipedia_index_rollup_preserve_metric.json
new file mode 100644
index 0000000000..dacedf0ef1
--- /dev/null
+++ b/integration-tests/src/test/resources/indexer/wikipedia_index_rollup_preserve_metric.json
@@ -0,0 +1,95 @@
+{
+  "type": "index_parallel",
+  "spec": {
+    "ioConfig": {
+      "type": "index_parallel",
+      "inputSource": {
+        "type": "inline",
+        "data": "{\"isRobot\":true,\"language\":\"en\",\"timestamp\":\"2013-08-31T00:00:11.080Z\",\"flags\":\"NB\",\"isUnpatrolled\":false,\"page\":\"Salo Toraut\",\"diffUrl\":\"https://sv.wikipedia.org/w/index.php?oldid=36099284&rcid=89369918\",\"added\":31,\"comment\":\"Botskapande Indonesien omdirigering\",\"commentLength\":35,\"isNew\":true,\"isMinor\":false,\"delta\":31,\"isAnonymous\":false,\"user\":\"maytas1\",\"deltaBucket\":0.0,\"deleted\":0,\"namespace\":\"Main\"}\n{\"isRobot\" [...]
+      },
+      "inputFormat": {
+        "type": "json"
+      },
+      "appendToExisting": true
+    },
+    "dataSchema": {
+      "granularitySpec": {
+        "segmentGranularity": "day",
+        "queryGranularity": "hour",
+        "rollup": true
+      },
+      "dataSource": "%%DATASOURCE%%",
+      "timestampSpec": {
+        "column": "timestamp",
+        "format": "iso"
+      },
+      "dimensionsSpec": {
+        "dimensions": [
+          "isRobot",
+          "language",
+          "flags",
+          "isUnpatrolled",
+          "page",
+          "diffUrl",
+          "comment",
+          "isNew",
+          "isMinor",
+          "isAnonymous",
+          "namespace"
+        ]
+      },
+      "metricsSpec": [
+        {
+          "name": "count",
+          "type": "count"
+        },
+        {
+          "name": "sum_added",
+          "type": "longSum",
+          "fieldName": "added"
+        },
+        {
+          "name": "sum_commentLength",
+          "type": "longSum",
+          "fieldName": "commentLength"
+        },
+        {
+          "name": "sum_delta",
+          "type": "longSum",
+          "fieldName": "delta"
+        },
+        {
+          "name": "sum_deltaBucket",
+          "type": "longSum",
+          "fieldName": "deltaBucket"
+        },
+        {
+          "name": "sum_deleted",
+          "type": "longSum",
+          "fieldName": "deleted"
+        },
+        {
+          "name": "thetaSketch",
+          "type": "thetaSketch",
+          "fieldName": "user"
+        },
+        {
+          "name": "quantilesDoublesSketch",
+          "type": "quantilesDoublesSketch",
+          "fieldName": "delta"
+        },
+        {
+          "name": "HLLSketchBuild",
+          "type": "HLLSketchBuild",
+          "fieldName": "user"
+        }
+      ]
+    },
+    "tuningConfig": {
+      "type": "index_parallel",
+      "partitionsSpec": {
+        "type": "dynamic"
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_sketch_queries.json b/integration-tests/src/test/resources/indexer/wikipedia_index_sketch_queries.json
new file mode 100644
index 0000000000..2b7a0625bc
--- /dev/null
+++ b/integration-tests/src/test/resources/indexer/wikipedia_index_sketch_queries.json
@@ -0,0 +1,49 @@
+[
+  {
+    "description": "timeseries, datasketch aggs, all",
+    "query":{
+      "queryType" : "timeseries",
+      "dataSource": "%%DATASOURCE%%",
+      "granularity":"day",
+      "intervals":[
+        "2013-08-31T00:00/2013-09-01T00:00"
+      ],
+      "filter":null,
+      "aggregations":[
+        {
+          "type": "HLLSketchMerge",
+          "name": "approxCountHLL",
+          "fieldName": "HLLSketchBuild",
+          "lgK": 12,
+          "tgtHllType": "HLL_4",
+          "round": true
+        },
+        {
+          "type":"thetaSketch",
+          "name":"approxCountTheta",
+          "fieldName":"thetaSketch",
+          "size":16384,
+          "shouldFinalize":true,
+          "isInputThetaSketch":false,
+          "errorBoundsStdDev":null
+        },
+        {
+          "type":"quantilesDoublesSketch",
+          "name":"quantilesSketch",
+          "fieldName":"quantilesDoublesSketch",
+          "k":128
+        }
+      ]
+    },
+    "expectedResults":[
+      {
+        "timestamp" : "2013-08-31T00:00:00.000Z",
+        "result" : {
+          "quantilesSketch":%%QUANTILESRESULT%%,
+          "approxCountTheta":%%THETARESULT%%,
+          "approxCountHLL":%%HLLRESULT%%
+        }
+      }
+    ]
+  }
+]
\ No newline at end of file
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
index 5260669f8b..1269fe1e6b 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
@@ -34,6 +34,12 @@ public abstract class AppendableIndexBuilder
   protected boolean sortFacts = true;
   protected int maxRowCount = 0;
   protected long maxBytesInMemory = 0;
+  // When set to true, for any row that already has metric (with the same name defined in metricSpec),
+  // the metric aggregator in metricSpec is skipped and the existing metric is unchanged. If the row does not already have
+  // the metric, then the metric aggregator is applied on the source column as usual. This should only be set for
+  // DruidInputSource since that is the only case where we can have existing metrics.
+  // This is currently only use by auto compaction and should not be use for anything else.
+  protected boolean preserveExistingMetrics = false;
   protected boolean useMaxMemoryEstimates = true;
 
   protected final Logger log = new Logger(this.getClass());
@@ -56,7 +62,7 @@ public abstract class AppendableIndexBuilder
   @VisibleForTesting
   public AppendableIndexBuilder setSimpleTestingIndexSchema(final AggregatorFactory... metrics)
   {
-    return setSimpleTestingIndexSchema(null, metrics);
+    return setSimpleTestingIndexSchema(null, null, metrics);
   }
 
 
@@ -70,10 +76,15 @@ public abstract class AppendableIndexBuilder
    * @return this
    */
   @VisibleForTesting
-  public AppendableIndexBuilder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics)
+  public AppendableIndexBuilder setSimpleTestingIndexSchema(
+      @Nullable Boolean rollup,
+      @Nullable Boolean preserveExistingMetrics,
+      final AggregatorFactory... metrics
+  )
   {
     IncrementalIndexSchema.Builder builder = new IncrementalIndexSchema.Builder().withMetrics(metrics);
     this.incrementalIndexSchema = rollup != null ? builder.withRollup(rollup).build() : builder.build();
+    this.preserveExistingMetrics = preserveExistingMetrics != null ? preserveExistingMetrics : false;
     return this;
   }
 
@@ -107,6 +118,12 @@ public abstract class AppendableIndexBuilder
     return this;
   }
 
+  public AppendableIndexBuilder setPreserveExistingMetrics(final boolean preserveExistingMetrics)
+  {
+    this.preserveExistingMetrics = preserveExistingMetrics;
+    return this;
+  }
+
   public AppendableIndexBuilder setUseMaxMemoryEstimates(final boolean useMaxMemoryEstimates)
   {
     this.useMaxMemoryEstimates = useMaxMemoryEstimates;
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
index b36ae6872e..c2e36cade3 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
@@ -227,6 +227,7 @@ public abstract class IncrementalIndex extends AbstractIndex implements Iterable
   private final AggregatorFactory[] metrics;
   private final boolean deserializeComplexMetrics;
   private final Metadata metadata;
+  protected final boolean preserveExistingMetrics;
 
   private final Map<String, MetricDesc> metricDescs;
 
@@ -257,12 +258,20 @@ public abstract class IncrementalIndex extends AbstractIndex implements Iterable
    * @param deserializeComplexMetrics flag whether or not to call ComplexMetricExtractor.extractValue() on the input
    *                                  value for aggregators that return metrics other than float.
    * @param concurrentEventAdd        flag whether ot not adding of input rows should be thread-safe
+   * @param preserveExistingMetrics   When set to true, for any row that already has metric
+   *                                  (with the same name defined in metricSpec), the metric aggregator in metricSpec
+   *                                  is skipped and the existing metric is unchanged. If the row does not already have
+   *                                  the metric, then the metric aggregator is applied on the source column as usual.
+   *                                  This should only be set for DruidInputSource since that is the only case where we
+   *                                  can have existing metrics. This is currently only use by auto compaction and
+   *                                  should not be use for anything else.
    * @param useMaxMemoryEstimates     true if max values should be used to estimate memory
    */
   protected IncrementalIndex(
       final IncrementalIndexSchema incrementalIndexSchema,
       final boolean deserializeComplexMetrics,
       final boolean concurrentEventAdd,
+      final boolean preserveExistingMetrics,
       final boolean useMaxMemoryEstimates
   )
   {
@@ -273,6 +282,7 @@ public abstract class IncrementalIndex extends AbstractIndex implements Iterable
     this.metrics = incrementalIndexSchema.getMetrics();
     this.rowTransformers = new CopyOnWriteArrayList<>();
     this.deserializeComplexMetrics = deserializeComplexMetrics;
+    this.preserveExistingMetrics = preserveExistingMetrics;
     this.useMaxMemoryEstimates = useMaxMemoryEstimates;
 
     this.timeAndMetricsColumnCapabilities = new HashMap<>();
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
index 493180db45..a28a0a620f 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -19,6 +19,8 @@
 
 package org.apache.druid.segment.incremental;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Supplier;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Maps;
@@ -52,6 +54,7 @@ import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
 
 /**
  *
@@ -109,6 +112,8 @@ public class OnheapIncrementalIndex extends IncrementalIndex
   @Nullable
   private volatile Map<String, ColumnSelectorFactory> selectors;
   @Nullable
+  private volatile Map<String, ColumnSelectorFactory> combiningAggSelectors;
+  @Nullable
   private String outOfRowsReason = null;
 
   OnheapIncrementalIndex(
@@ -118,10 +123,13 @@ public class OnheapIncrementalIndex extends IncrementalIndex
       boolean sortFacts,
       int maxRowCount,
       long maxBytesInMemory,
+      // preserveExistingMetrics should only be set true for DruidInputSource since that is the only case where we can have existing metrics
+      // This is currently only use by auto compaction and should not be use for anything else.
+      boolean preserveExistingMetrics,
       boolean useMaxMemoryEstimates
   )
   {
-    super(incrementalIndexSchema, deserializeComplexMetrics, concurrentEventAdd, useMaxMemoryEstimates);
+    super(incrementalIndexSchema, deserializeComplexMetrics, concurrentEventAdd, preserveExistingMetrics, useMaxMemoryEstimates);
     this.maxRowCount = maxRowCount;
     this.maxBytesInMemory = maxBytesInMemory == 0 ? Long.MAX_VALUE : maxBytesInMemory;
     this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions())
@@ -182,6 +190,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
   )
   {
     selectors = new HashMap<>();
+    combiningAggSelectors = new HashMap<>();
     for (AggregatorFactory agg : metrics) {
       selectors.put(
           agg.getName(),
@@ -190,6 +199,16 @@ public class OnheapIncrementalIndex extends IncrementalIndex
               concurrentEventAdd
           )
       );
+      if (preserveExistingMetrics) {
+        AggregatorFactory combiningAgg = agg.getCombiningFactory();
+        combiningAggSelectors.put(
+            combiningAgg.getName(),
+            new CachingColumnSelectorFactory(
+                makeColumnSelectorFactory(combiningAgg, rowSupplier, deserializeComplexMetrics),
+                concurrentEventAdd
+            )
+        );
+      }
     }
   }
 
@@ -214,7 +233,11 @@ public class OnheapIncrementalIndex extends IncrementalIndex
       long aggSizeDelta = doAggregate(metrics, aggs, rowContainer, row, parseExceptionMessages);
       totalSizeInBytes.addAndGet(useMaxMemoryEstimates ? 0 : aggSizeDelta);
     } else {
-      aggs = new Aggregator[metrics.length];
+      if (preserveExistingMetrics) {
+        aggs = new Aggregator[metrics.length * 2];
+      } else {
+        aggs = new Aggregator[metrics.length];
+      }
       long aggSizeForRow = factorizeAggs(metrics, aggs, rowContainer, row);
       aggSizeForRow += doAggregate(metrics, aggs, rowContainer, row, parseExceptionMessages);
 
@@ -279,23 +302,33 @@ public class OnheapIncrementalIndex extends IncrementalIndex
   {
     long totalInitialSizeBytes = 0L;
     rowContainer.set(row);
-
     final long aggReferenceSize = Long.BYTES;
     for (int i = 0; i < metrics.length; i++) {
       final AggregatorFactory agg = metrics[i];
-
+      // Creates aggregators to aggregate from input into output fields
       if (useMaxMemoryEstimates) {
         aggs[i] = agg.factorize(selectors.get(agg.getName()));
       } else {
-        AggregatorAndSize aggregatorAndSize =
-            agg.factorizeWithSize(selectors.get(agg.getName()));
+        AggregatorAndSize aggregatorAndSize = agg.factorizeWithSize(selectors.get(agg.getName()));
         aggs[i] = aggregatorAndSize.getAggregator();
         totalInitialSizeBytes += aggregatorAndSize.getInitialSizeBytes();
         totalInitialSizeBytes += aggReferenceSize;
       }
+      // Creates aggregators to combine already aggregated field
+      if (preserveExistingMetrics) {
+        if (useMaxMemoryEstimates) {
+          AggregatorFactory combiningAgg = agg.getCombiningFactory();
+          aggs[i + metrics.length] = combiningAgg.factorize(combiningAggSelectors.get(combiningAgg.getName()));
+        } else {
+          AggregatorFactory combiningAgg = agg.getCombiningFactory();
+          AggregatorAndSize aggregatorAndSize = combiningAgg.factorizeWithSize(combiningAggSelectors.get(combiningAgg.getName()));
+          aggs[i + metrics.length] = aggregatorAndSize.getAggregator();
+          totalInitialSizeBytes += aggregatorAndSize.getInitialSizeBytes();
+          totalInitialSizeBytes += aggReferenceSize;
+        }
+      }
     }
     rowContainer.set(null);
-
     return totalInitialSizeBytes;
   }
 
@@ -315,10 +348,14 @@ public class OnheapIncrementalIndex extends IncrementalIndex
   )
   {
     rowContainer.set(row);
-
     long totalIncrementalBytes = 0L;
-    for (int i = 0; i < aggs.length; i++) {
-      final Aggregator agg = aggs[i];
+    for (int i = 0; i < metrics.length; i++) {
+      final Aggregator agg;
+      if (preserveExistingMetrics && row instanceof MapBasedRow && ((MapBasedRow) row).getEvent().containsKey(metrics[i].getName())) {
+        agg = aggs[i + metrics.length];
+      } else {
+        agg = aggs[i];
+      }
       synchronized (agg) {
         try {
           if (useMaxMemoryEstimates) {
@@ -329,8 +366,13 @@ public class OnheapIncrementalIndex extends IncrementalIndex
         }
         catch (ParseException e) {
           // "aggregate" can throw ParseExceptions if a selector expects something but gets something else.
-          log.debug(e, "Encountered parse error, skipping aggregator[%s].", metrics[i].getName());
-          parseExceptionsHolder.add(e.getMessage());
+          if (preserveExistingMetrics) {
+            log.warn(e, "Failing ingestion as preserveExistingMetrics is enabled but selector of aggregator[%s] recieved incompatible type.", metrics[i].getName());
+            throw e;
+          } else {
+            log.debug(e, "Encountered parse error, skipping aggregator[%s].", metrics[i].getName());
+            parseExceptionsHolder.add(e.getMessage());
+          }
         }
       }
     }
@@ -410,31 +452,35 @@ public class OnheapIncrementalIndex extends IncrementalIndex
   @Override
   public float getMetricFloatValue(int rowOffset, int aggOffset)
   {
-    return concurrentGet(rowOffset)[aggOffset].getFloat();
+    return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getFloat);
   }
 
   @Override
   public long getMetricLongValue(int rowOffset, int aggOffset)
   {
-    return concurrentGet(rowOffset)[aggOffset].getLong();
+    return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getLong);
   }
 
   @Override
   public Object getMetricObjectValue(int rowOffset, int aggOffset)
   {
-    return concurrentGet(rowOffset)[aggOffset].get();
+    return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::get);
   }
 
   @Override
   protected double getMetricDoubleValue(int rowOffset, int aggOffset)
   {
-    return concurrentGet(rowOffset)[aggOffset].getDouble();
+    return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getDouble);
   }
 
   @Override
   public boolean isNull(int rowOffset, int aggOffset)
   {
-    return concurrentGet(rowOffset)[aggOffset].isNull();
+    if (preserveExistingMetrics) {
+      return concurrentGet(rowOffset)[aggOffset].isNull() && concurrentGet(rowOffset)[aggOffset + getMetricAggs().length].isNull();
+    } else {
+      return concurrentGet(rowOffset)[aggOffset].isNull();
+    }
   }
 
   @Override
@@ -475,8 +521,9 @@ public class OnheapIncrementalIndex extends IncrementalIndex
               }
 
               Aggregator[] aggs = getAggsForRow(rowOffset);
-              for (int i = 0; i < aggs.length; ++i) {
-                theVals.put(metrics[i].getName(), aggs[i].get());
+              int aggLength = preserveExistingMetrics ? aggs.length / 2 : aggs.length;
+              for (int i = 0; i < aggLength; ++i) {
+                theVals.put(metrics[i].getName(), getMetricHelper(metrics, aggs, i, Aggregator::get));
               }
 
               if (postAggs != null) {
@@ -492,6 +539,40 @@ public class OnheapIncrementalIndex extends IncrementalIndex
     }
   }
 
+  /**
+   * Apply the getMetricTypeFunction function to the retrieve aggregated value given the list of aggregators and offset.
+   * If preserveExistingMetrics flag is set, then this method will combine values from two aggregators, the aggregator
+   * for aggregating from input into output field and the aggregator for combining already aggregated field, as needed
+   */
+  private <T> T getMetricHelper(AggregatorFactory[] metrics, Aggregator[] aggs, int aggOffset, Function<Aggregator, T> getMetricTypeFunction)
+  {
+    if (preserveExistingMetrics) {
+      // Since the preserveExistingMetrics flag is set, we will have to check and possibly retrieve the aggregated values
+      // from two aggregators, the aggregator for aggregating from input into output field and the aggregator
+      // for combining already aggregated field
+      if (aggs[aggOffset].isNull()) {
+        // If the aggregator for aggregating from input into output field is null, then we get the value from the
+        // aggregator that we use for combining already aggregated field
+        return getMetricTypeFunction.apply(aggs[aggOffset + metrics.length]);
+      } else if (aggs[aggOffset + metrics.length].isNull()) {
+        // If the aggregator for combining already aggregated field is null, then we get the value from the
+        // aggregator for aggregating from input into output field
+        return getMetricTypeFunction.apply(aggs[aggOffset]);
+      } else {
+        // Since both aggregators is not null and contain values, we will have to retrieve the values from both
+        // aggregators and combine them
+        AggregatorFactory aggregatorFactory = metrics[aggOffset];
+        T aggregatedFromSource = getMetricTypeFunction.apply(aggs[aggOffset]);
+        T aggregatedFromCombined = getMetricTypeFunction.apply(aggs[aggOffset + metrics.length]);
+        return (T) aggregatorFactory.combine(aggregatedFromSource, aggregatedFromCombined);
+      }
+    } else {
+      // If preserveExistingMetrics flag is not set then we simply get metrics from the list of Aggregator, aggs, using the
+      // given aggOffset
+      return getMetricTypeFunction.apply(aggs[aggOffset]);
+    }
+  }
+
   /**
    * Clear out maps to allow GC
    * NOTE: This is NOT thread-safe with add... so make sure all the adding is DONE before closing
@@ -506,6 +587,9 @@ public class OnheapIncrementalIndex extends IncrementalIndex
     if (selectors != null) {
       selectors.clear();
     }
+    if (combiningAggSelectors != null) {
+      combiningAggSelectors.clear();
+    }
   }
 
   /**
@@ -571,6 +655,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex
           sortFacts,
           maxRowCount,
           maxBytesInMemory,
+          preserveExistingMetrics,
           useMaxMemoryEstimates
       );
     }
@@ -578,12 +663,39 @@ public class OnheapIncrementalIndex extends IncrementalIndex
 
   public static class Spec implements AppendableIndexSpec
   {
+    private static final boolean DEFAULT_PRESERVE_EXISTING_METRICS = false;
     public static final String TYPE = "onheap";
 
+    // When set to true, for any row that already has metric (with the same name defined in metricSpec),
+    // the metric aggregator in metricSpec is skipped and the existing metric is unchanged. If the row does not already have
+    // the metric, then the metric aggregator is applied on the source column as usual. This should only be set for
+    // DruidInputSource since that is the only case where we can have existing metrics.
+    // This is currently only use by auto compaction and should not be use for anything else.
+    final boolean preserveExistingMetrics;
+
+    public Spec()
+    {
+      this.preserveExistingMetrics = DEFAULT_PRESERVE_EXISTING_METRICS;
+    }
+
+    @JsonCreator
+    public Spec(
+        final @JsonProperty("preserveExistingMetrics") @Nullable Boolean preserveExistingMetrics
+    )
+    {
+      this.preserveExistingMetrics = preserveExistingMetrics != null ? preserveExistingMetrics : DEFAULT_PRESERVE_EXISTING_METRICS;
+    }
+
+    @JsonProperty
+    public boolean isPreserveExistingMetrics()
+    {
+      return preserveExistingMetrics;
+    }
+
     @Override
     public AppendableIndexBuilder builder()
     {
-      return new Builder();
+      return new Builder().setPreserveExistingMetrics(preserveExistingMetrics);
     }
 
     @Override
@@ -596,15 +708,22 @@ public class OnheapIncrementalIndex extends IncrementalIndex
     }
 
     @Override
-    public boolean equals(Object that)
+    public boolean equals(Object o)
     {
-      return that.getClass().equals(this.getClass());
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      Spec spec = (Spec) o;
+      return preserveExistingMetrics == spec.preserveExistingMetrics;
     }
 
     @Override
     public int hashCode()
     {
-      return Objects.hash(this.getClass());
+      return Objects.hash(preserveExistingMetrics);
     }
   }
 }
diff --git a/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
index 646f1d66bf..3257ec6d49 100644
--- a/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
@@ -29,6 +29,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.data.input.MapBasedInputRow;
 import org.apache.druid.data.input.Row;
 import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -37,6 +38,7 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.guava.Accumulator;
 import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.parsers.ParseException;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.FinalizeResultsQueryRunner;
 import org.apache.druid.query.QueryPlus;
@@ -70,6 +72,7 @@ import org.joda.time.Interval;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -92,23 +95,30 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class IncrementalIndexTest extends InitializedNullHandlingTest
 {
   public final IncrementalIndexCreator indexCreator;
+  private final boolean isPreserveExistingMetrics;
 
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
   @Rule
   public final CloserRule closer = new CloserRule(false);
 
-  public IncrementalIndexTest(String indexType, String mode) throws JsonProcessingException
+  public IncrementalIndexTest(String indexType, String mode, boolean isPreserveExistingMetrics) throws JsonProcessingException
   {
+    this.isPreserveExistingMetrics = isPreserveExistingMetrics;
     indexCreator = closer.closeLater(new IncrementalIndexCreator(indexType, (builder, args) -> builder
-        .setSimpleTestingIndexSchema("rollup".equals(mode), (AggregatorFactory[]) args[0])
+        .setSimpleTestingIndexSchema("rollup".equals(mode), isPreserveExistingMetrics, (AggregatorFactory[]) args[0])
         .setMaxRowCount(1_000_000)
         .build()
     ));
   }
 
-  @Parameterized.Parameters(name = "{index}: {0}, {1}")
+  @Parameterized.Parameters(name = "{index}: {0}, {1}, {2}")
   public static Collection<?> constructorFeeder()
   {
-    return IncrementalIndexCreator.indexTypeCartesianProduct(ImmutableList.of("rollup", "plain"));
+    return IncrementalIndexCreator.indexTypeCartesianProduct(
+        ImmutableList.of("rollup", "plain"),
+        ImmutableList.of(true, false)
+    );
   }
 
   public static AggregatorFactory[] getDefaultCombiningAggregatorFactories()
@@ -155,7 +165,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
     }
 
     return new OnheapIncrementalIndex.Builder()
-        .setSimpleTestingIndexSchema(false, aggregatorFactories)
+        .setSimpleTestingIndexSchema(false, false, aggregatorFactories)
         .setMaxRowCount(1000000)
         .build();
   }
@@ -721,4 +731,269 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
 
     Assert.assertEquals(2, index.size());
   }
+
+  @Test
+  public void testSchemaRollupWithRowWithExistingMetricsAndWithoutMetric() throws IndexSizeExceededException
+  {
+    AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{
+        new CountAggregatorFactory("count"),
+        new LongSumAggregatorFactory("sum_of_x", "x")
+    };
+    final IncrementalIndex index = indexCreator.createIndex((Object) aggregatorFactories);
+    index.add(
+        new MapBasedInputRow(
+            1481871600000L,
+            Arrays.asList("name", "host"),
+            ImmutableMap.of("name", "name1", "host", "host", "x", 2)
+        )
+    );
+    index.add(
+        new MapBasedInputRow(
+            1481871600000L,
+            Arrays.asList("name", "host"),
+            ImmutableMap.of("name", "name1", "host", "host", "x", 3)
+        )
+    );
+    index.add(
+        new MapBasedInputRow(
+            1481871600000L,
+            Arrays.asList("name", "host"),
+            ImmutableMap.of("name", "name1", "host", "host", "count", 2, "sum_of_x", 4)
+        )
+    );
+    index.add(
+        new MapBasedInputRow(
+            1481871600000L,
+            Arrays.asList("name", "host"),
+            ImmutableMap.of("name", "name1", "host", "host", "count", 3, "sum_of_x", 5)
+        )
+    );
+
+    Assert.assertEquals(index.isRollup() ? 1 : 4, index.size());
+    Iterator<Row> iterator = index.iterator();
+    int rowCount = 0;
+    while (iterator.hasNext()) {
+      rowCount++;
+      Row row = iterator.next();
+      Assert.assertEquals(1481871600000L, row.getTimestampFromEpoch());
+      if (index.isRollup()) {
+        // All rows are rollup into one row
+        Assert.assertEquals(isPreserveExistingMetrics ? 7 : 4, row.getMetric("count").intValue());
+        Assert.assertEquals(isPreserveExistingMetrics ? 14 : 5, row.getMetric("sum_of_x").intValue());
+      } else {
+        // We still have 4 rows
+        if (rowCount == 1 || rowCount == 2) {
+          Assert.assertEquals(1, row.getMetric("count").intValue());
+          Assert.assertEquals(1 + rowCount, row.getMetric("sum_of_x").intValue());
+        } else {
+          if (isPreserveExistingMetrics) {
+            Assert.assertEquals(rowCount - 1, row.getMetric("count").intValue());
+            Assert.assertEquals(1 + rowCount, row.getMetric("sum_of_x").intValue());
+          } else {
+            Assert.assertEquals(1, row.getMetric("count").intValue());
+            // The rows does not have the dim "x", hence metric is null (useDefaultValueForNull=false) or 0 (useDefaultValueForNull=true)
+            Assert.assertEquals(NullHandling.sqlCompatible() ? null : 0L, row.getMetric("sum_of_x"));
+          }
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testSchemaRollupWithRowWithOnlyExistingMetrics() throws IndexSizeExceededException
+  {
+    AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{
+        new CountAggregatorFactory("count"),
+        new LongSumAggregatorFactory("sum_of_x", "x")
+    };
+    final IncrementalIndex index = indexCreator.createIndex((Object) aggregatorFactories);
+    index.add(
+        new MapBasedInputRow(
+            1481871600000L,
+            Arrays.asList("name", "host"),
+            ImmutableMap.of("name", "name1", "host", "host", "count", 2, "sum_of_x", 4)
+        )
+    );
+    index.add(
+        new MapBasedInputRow(
+            1481871600000L,
+            Arrays.asList("name", "host"),
+            ImmutableMap.of("name", "name1", "host", "host", "count", 3, "x", 3, "sum_of_x", 5)
+        )
+    );
+
+    Assert.assertEquals(index.isRollup() ? 1 : 2, index.size());
+    Iterator<Row> iterator = index.iterator();
+    int rowCount = 0;
+    while (iterator.hasNext()) {
+      rowCount++;
+      Row row = iterator.next();
+      Assert.assertEquals(1481871600000L, row.getTimestampFromEpoch());
+      if (index.isRollup()) {
+        // All rows are rollup into one row
+        Assert.assertEquals(isPreserveExistingMetrics ? 5 : 2, row.getMetric("count").intValue());
+        Assert.assertEquals(isPreserveExistingMetrics ? 9 : 3, row.getMetric("sum_of_x").intValue());
+      } else {
+        // We still have 2 rows
+        if (rowCount == 1) {
+          if (isPreserveExistingMetrics) {
+            Assert.assertEquals(2, row.getMetric("count").intValue());
+            Assert.assertEquals(4, row.getMetric("sum_of_x").intValue());
+          } else {
+            Assert.assertEquals(1, row.getMetric("count").intValue());
+            // The rows does not have the dim "x", hence metric is null (useDefaultValueForNull=false) or 0 (useDefaultValueForNull=true)
+            Assert.assertEquals(NullHandling.sqlCompatible() ? null : 0L, row.getMetric("sum_of_x"));
+          }
+        } else {
+          Assert.assertEquals(isPreserveExistingMetrics ? 3 : 1, row.getMetric("count").intValue());
+          Assert.assertEquals(isPreserveExistingMetrics ? 5 : 3, row.getMetric("sum_of_x").intValue());
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testSchemaRollupWithRowsWithNoMetrics() throws IndexSizeExceededException
+  {
+    AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{
+        new CountAggregatorFactory("count"),
+        new LongSumAggregatorFactory("sum_of_x", "x")
+    };
+    final IncrementalIndex index = indexCreator.createIndex((Object) aggregatorFactories);
+    index.add(
+        new MapBasedInputRow(
+            1481871600000L,
+            Arrays.asList("name", "host"),
+            ImmutableMap.of("name", "name1", "host", "host", "x", 4)
+        )
+    );
+    index.add(
+        new MapBasedInputRow(
+            1481871600000L,
+            Arrays.asList("name", "host"),
+            ImmutableMap.of("name", "name1", "host", "host", "x", 3)
+        )
+    );
+
+    Assert.assertEquals(index.isRollup() ? 1 : 2, index.size());
+    Iterator<Row> iterator = index.iterator();
+    int rowCount = 0;
+    while (iterator.hasNext()) {
+      rowCount++;
+      Row row = iterator.next();
+      Assert.assertEquals(1481871600000L, row.getTimestampFromEpoch());
+      if (index.isRollup()) {
+        // All rows are rollup into one row
+        Assert.assertEquals(2, row.getMetric("count").intValue());
+        Assert.assertEquals(7, row.getMetric("sum_of_x").intValue());
+      } else {
+        // We still have 2 rows
+        if (rowCount == 1) {
+          Assert.assertEquals(1, row.getMetric("count").intValue());
+          Assert.assertEquals(4, row.getMetric("sum_of_x").intValue());
+        } else {
+          Assert.assertEquals(1, row.getMetric("count").intValue());
+          Assert.assertEquals(3, row.getMetric("sum_of_x").intValue());
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testSchemaRollupWithRowWithMixedTypeMetrics() throws IndexSizeExceededException
+  {
+    if (isPreserveExistingMetrics) {
+      expectedException.expect(ParseException.class);
+    }
+    AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{
+        new CountAggregatorFactory("count"),
+        new LongSumAggregatorFactory("sum_of_x", "x")
+    };
+    final IncrementalIndex index = indexCreator.createIndex((Object) aggregatorFactories);
+    index.add(
+        new MapBasedInputRow(
+            1481871600000L,
+            Arrays.asList("name", "host"),
+            ImmutableMap.of("name", "name1", "host", "host", "count", "not a number 1", "sum_of_x", 4)
+        )
+    );
+    index.add(
+        new MapBasedInputRow(
+            1481871600000L,
+            Arrays.asList("name", "host"),
+            ImmutableMap.of("name", "name1", "host", "host", "count", 3, "x", 3, "sum_of_x", "not a number 2")
+        )
+    );
+
+    Assert.assertEquals(index.isRollup() ? 1 : 2, index.size());
+    Iterator<Row> iterator = index.iterator();
+    int rowCount = 0;
+    while (iterator.hasNext()) {
+      rowCount++;
+      Row row = iterator.next();
+      Assert.assertEquals(1481871600000L, row.getTimestampFromEpoch());
+      if (index.isRollup()) {
+        // All rows are rollup into one row
+        Assert.assertEquals(2, row.getMetric("count").intValue());
+        Assert.assertEquals(3, row.getMetric("sum_of_x").intValue());
+      } else {
+        // We still have 2 rows
+        if (rowCount == 1) {
+          Assert.assertEquals(1, row.getMetric("count").intValue());
+          // The rows does not have the dim "x", hence metric is null (useDefaultValueForNull=false) or 0 (useDefaultValueForNull=true)
+          Assert.assertEquals(NullHandling.sqlCompatible() ? null : 0L, row.getMetric("sum_of_x"));
+        } else {
+          Assert.assertEquals(1, row.getMetric("count").intValue());
+          Assert.assertEquals(3, row.getMetric("sum_of_x").intValue());
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testSchemaRollupWithRowsWithNonRolledUpSameColumnName() throws IndexSizeExceededException
+  {
+    AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{
+        new CountAggregatorFactory("count"),
+        new LongSumAggregatorFactory("sum_of_x", "x")
+    };
+    final IncrementalIndex index = indexCreator.createIndex((Object) aggregatorFactories);
+    index.add(
+        new MapBasedInputRow(
+            1481871600000L,
+            Arrays.asList("name", "host"),
+            ImmutableMap.of("name", "name1", "sum_of_x", 100, "x", 4)
+        )
+    );
+    index.add(
+        new MapBasedInputRow(
+            1481871600000L,
+            Arrays.asList("name", "host"),
+            ImmutableMap.of("name", "name1", "sum_of_x", 100, "x", 3)
+        )
+    );
+
+    Assert.assertEquals(index.isRollup() ? 1 : 2, index.size());
+    Iterator<Row> iterator = index.iterator();
+    int rowCount = 0;
+    while (iterator.hasNext()) {
+      rowCount++;
+      Row row = iterator.next();
+      Assert.assertEquals(1481871600000L, row.getTimestampFromEpoch());
+      if (index.isRollup()) {
+        // All rows are rollup into one row
+        Assert.assertEquals(2, row.getMetric("count").intValue());
+        Assert.assertEquals(isPreserveExistingMetrics ? 200 : 7, row.getMetric("sum_of_x").intValue());
+      } else {
+        // We still have 2 rows
+        if (rowCount == 1) {
+          Assert.assertEquals(1, row.getMetric("count").intValue());
+          Assert.assertEquals(isPreserveExistingMetrics ? 100 : 4, row.getMetric("sum_of_x").intValue());
+        } else {
+          Assert.assertEquals(1, row.getMetric("count").intValue());
+          Assert.assertEquals(isPreserveExistingMetrics ? 100 : 3, row.getMetric("sum_of_x").intValue());
+        }
+      }
+    }
+  }
 }
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAdapterTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAdapterTest.java
index 5200ada8ef..096d5a0aef 100644
--- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAdapterTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAdapterTest.java
@@ -62,7 +62,7 @@ public class IncrementalIndexAdapterTest extends InitializedNullHandlingTest
   public IncrementalIndexAdapterTest(String indexType) throws JsonProcessingException
   {
     indexCreator = closer.closeLater(new IncrementalIndexCreator(indexType, (builder, args) -> builder
-        .setSimpleTestingIndexSchema("rollup".equals(args[0]), new CountAggregatorFactory("count"))
+        .setSimpleTestingIndexSchema("rollup".equals(args[0]), null, new CountAggregatorFactory("count"))
         .setMaxRowCount(1_000_000)
         .build()
     ));
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
index b33d16b07d..02d43e2750 100644
--- a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java
@@ -124,6 +124,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
           sortFacts,
           maxRowCount,
           maxBytesInMemory,
+          false,
           true
       );
     }
@@ -147,6 +148,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
           true,
           maxRowCount,
           maxBytesInMemory,
+          false,
           true
       );
     }
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java
index 3e3d215e58..f35257e244 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQueryTuningConfig.java
@@ -25,6 +25,8 @@ import org.apache.druid.data.input.SplitHintSpec;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
 import org.joda.time.Duration;
@@ -72,6 +74,8 @@ public class ClientCompactionTaskQueryTuningConfig
   private final Integer maxNumSegmentsToMerge;
   @Nullable
   private final Integer totalNumMergeTasks;
+  @Nullable
+  private final AppendableIndexSpec appendableIndexSpec;
 
   public static ClientCompactionTaskQueryTuningConfig from(
       @Nullable UserCompactionTaskQueryTuningConfig userCompactionTaskQueryTuningConfig,
@@ -81,6 +85,7 @@ public class ClientCompactionTaskQueryTuningConfig
     if (userCompactionTaskQueryTuningConfig == null) {
       return new ClientCompactionTaskQueryTuningConfig(
           maxRowsPerSegment,
+          new OnheapIncrementalIndex.Spec(true),
           null,
           null,
           null,
@@ -100,8 +105,12 @@ public class ClientCompactionTaskQueryTuningConfig
           null
       );
     } else {
+      AppendableIndexSpec appendableIndexSpecToUse = userCompactionTaskQueryTuningConfig.getAppendableIndexSpec() != null
+                                                     ? userCompactionTaskQueryTuningConfig.getAppendableIndexSpec()
+                                                     : new OnheapIncrementalIndex.Spec(true);
       return new ClientCompactionTaskQueryTuningConfig(
           maxRowsPerSegment,
+          appendableIndexSpecToUse,
           userCompactionTaskQueryTuningConfig.getMaxRowsInMemory(),
           userCompactionTaskQueryTuningConfig.getMaxBytesInMemory(),
           userCompactionTaskQueryTuningConfig.getMaxTotalRows(),
@@ -126,6 +135,7 @@ public class ClientCompactionTaskQueryTuningConfig
   @JsonCreator
   public ClientCompactionTaskQueryTuningConfig(
       @JsonProperty("maxRowsPerSegment") @Deprecated @Nullable Integer maxRowsPerSegment,
+      @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
       @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
       @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
       @JsonProperty("maxTotalRows") @Deprecated @Nullable Long maxTotalRows,
@@ -146,6 +156,7 @@ public class ClientCompactionTaskQueryTuningConfig
   )
   {
     this.maxRowsPerSegment = maxRowsPerSegment;
+    this.appendableIndexSpec = appendableIndexSpec;
     this.maxRowsInMemory = maxRowsInMemory;
     this.maxBytesInMemory = maxBytesInMemory;
     this.maxTotalRows = maxTotalRows;
@@ -306,6 +317,13 @@ public class ClientCompactionTaskQueryTuningConfig
     return totalNumMergeTasks;
   }
 
+  @JsonProperty
+  @Nullable
+  public AppendableIndexSpec getAppendableIndexSpec()
+  {
+    return appendableIndexSpec;
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -333,7 +351,8 @@ public class ClientCompactionTaskQueryTuningConfig
            Objects.equals(chatHandlerTimeout, that.chatHandlerTimeout) &&
            Objects.equals(chatHandlerNumRetries, that.chatHandlerNumRetries) &&
            Objects.equals(maxNumSegmentsToMerge, that.maxNumSegmentsToMerge) &&
-           Objects.equals(totalNumMergeTasks, that.totalNumMergeTasks);
+           Objects.equals(totalNumMergeTasks, that.totalNumMergeTasks) &&
+           Objects.equals(appendableIndexSpec, that.appendableIndexSpec);
   }
 
   @Override
@@ -357,7 +376,8 @@ public class ClientCompactionTaskQueryTuningConfig
         chatHandlerTimeout,
         chatHandlerNumRetries,
         maxNumSegmentsToMerge,
-        totalNumMergeTasks
+        totalNumMergeTasks,
+        appendableIndexSpec
     );
   }
 
@@ -383,6 +403,7 @@ public class ClientCompactionTaskQueryTuningConfig
            ", chatHandlerNumRetries=" + chatHandlerNumRetries +
            ", maxNumSegmentsToMerge=" + maxNumSegmentsToMerge +
            ", totalNumMergeTasks=" + totalNumMergeTasks +
+           ", appendableIndexSpec=" + appendableIndexSpec +
            '}';
   }
 }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfig.java
index 38f559036b..07ab96c343 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfig.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfig.java
@@ -26,6 +26,7 @@ import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
 import org.apache.druid.data.input.SplitHintSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.joda.time.Duration;
 
@@ -36,6 +37,7 @@ public class UserCompactionTaskQueryTuningConfig extends ClientCompactionTaskQue
   @JsonCreator
   public UserCompactionTaskQueryTuningConfig(
       @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
+      @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
       @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
       @JsonProperty("maxTotalRows") @Deprecated @Nullable Long maxTotalRows,
       @JsonProperty("splitHintSpec") @Nullable SplitHintSpec splitHintSpec,
@@ -56,6 +58,7 @@ public class UserCompactionTaskQueryTuningConfig extends ClientCompactionTaskQue
   {
     super(
         null,
+        appendableIndexSpec,
         maxRowsInMemory,
         maxBytesInMemory,
         maxTotalRows,
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
index c643da154c..6725dd3428 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
@@ -36,6 +36,7 @@ import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.data.BitmapSerde.DefaultBitmapSerdeFactory;
 import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
 import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.joda.time.Duration;
@@ -123,6 +124,7 @@ public class DataSourceCompactionConfigTest extends InitializedNullHandlingTest
         null,
         new Period(3600),
         new UserCompactionTaskQueryTuningConfig(
+            null,
             null,
             null,
             10000L,
@@ -170,6 +172,7 @@ public class DataSourceCompactionConfigTest extends InitializedNullHandlingTest
         10000,
         new Period(3600),
         new UserCompactionTaskQueryTuningConfig(
+            null,
             null,
             null,
             10000L,
@@ -213,6 +216,47 @@ public class DataSourceCompactionConfigTest extends InitializedNullHandlingTest
   {
     final UserCompactionTaskQueryTuningConfig tuningConfig = new UserCompactionTaskQueryTuningConfig(
         40000,
+        null,
+        2000L,
+        null,
+        new SegmentsSplitHintSpec(new HumanReadableBytes(100000L), null),
+        new DynamicPartitionsSpec(1000, 20000L),
+        new IndexSpec(
+            new DefaultBitmapSerdeFactory(),
+            CompressionStrategy.LZ4,
+            CompressionStrategy.LZF,
+            LongEncodingStrategy.LONGS
+        ),
+        new IndexSpec(
+            new DefaultBitmapSerdeFactory(),
+            CompressionStrategy.LZ4,
+            CompressionStrategy.UNCOMPRESSED,
+            LongEncodingStrategy.AUTO
+        ),
+        2,
+        1000L,
+        TmpFileSegmentWriteOutMediumFactory.instance(),
+        100,
+        5,
+        1000L,
+        new Duration(3000L),
+        7,
+        1000,
+        100
+    );
+
+    final String json = OBJECT_MAPPER.writeValueAsString(tuningConfig);
+    final UserCompactionTaskQueryTuningConfig fromJson =
+        OBJECT_MAPPER.readValue(json, UserCompactionTaskQueryTuningConfig.class);
+    Assert.assertEquals(tuningConfig, fromJson);
+  }
+
+  @Test
+  public void testSerdeUserCompactionTuningConfigWithAppendableIndexSpec() throws IOException
+  {
+    final UserCompactionTaskQueryTuningConfig tuningConfig = new UserCompactionTaskQueryTuningConfig(
+        40000,
+        new OnheapIncrementalIndex.Spec(true),
         2000L,
         null,
         new SegmentsSplitHintSpec(new HumanReadableBytes(100000L), null),
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfigTest.java
index 7aa6540abf..dc52f78422 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfigTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskQueryTuningConfigTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.data.BitmapSerde.DefaultBitmapSerdeFactory;
 import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
 import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
 import org.joda.time.Duration;
 import org.junit.Assert;
@@ -60,6 +61,7 @@ public class UserCompactionTaskQueryTuningConfigTest
             null,
             null,
             null,
+            null,
             null
         );
     final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -75,6 +77,7 @@ public class UserCompactionTaskQueryTuningConfigTest
   {
     final UserCompactionTaskQueryTuningConfig tuningConfig = new UserCompactionTaskQueryTuningConfig(
         40000,
+        new OnheapIncrementalIndex.Spec(true),
         2000L,
         null,
         new SegmentsSplitHintSpec(new HumanReadableBytes(42L), null),
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index b25ebbaaf1..8cc8fad3f1 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -722,6 +722,7 @@ public class CompactSegmentsTest
                 null,
                 null,
                 null,
+                null,
                 partitionsSpec,
                 null,
                 null,
@@ -786,6 +787,7 @@ public class CompactSegmentsTest
                 null,
                 null,
                 null,
+                null,
                 partitionsSpec,
                 null,
                 null,
@@ -844,6 +846,7 @@ public class CompactSegmentsTest
                 null,
                 null,
                 null,
+                null,
                 partitionsSpec,
                 null,
                 null,
@@ -902,6 +905,7 @@ public class CompactSegmentsTest
                 null,
                 null,
                 null,
+                null,
                 partitionsSpec,
                 null,
                 null,
@@ -968,6 +972,7 @@ public class CompactSegmentsTest
                 null,
                 null,
                 null,
+                null,
                 partitionsSpec,
                 null,
                 null,
@@ -1029,6 +1034,7 @@ public class CompactSegmentsTest
                 null,
                 null,
                 null,
+                null,
                 partitionsSpec,
                 null,
                 null,
@@ -1089,6 +1095,7 @@ public class CompactSegmentsTest
                 null,
                 null,
                 null,
+                null,
                 partitionsSpec,
                 null,
                 null,
@@ -1190,6 +1197,7 @@ public class CompactSegmentsTest
                 null,
                 null,
                 null,
+                null,
                 partitionsSpec,
                 null,
                 null,
@@ -1315,6 +1323,7 @@ public class CompactSegmentsTest
                 null,
                 null,
                 null,
+                null,
                 partitionsSpec,
                 null,
                 null,
@@ -1376,6 +1385,7 @@ public class CompactSegmentsTest
                 null,
                 null,
                 null,
+                null,
                 partitionsSpec,
                 null,
                 null,
@@ -1441,6 +1451,7 @@ public class CompactSegmentsTest
                 null,
                 null,
                 null,
+                null,
                 partitionsSpec,
                 null,
                 null,
@@ -1587,6 +1598,7 @@ public class CompactSegmentsTest
                 null,
                 null,
                 null,
+                null,
                 partitionsSpec,
                 null,
                 null,
@@ -1683,6 +1695,7 @@ public class CompactSegmentsTest
                 null,
                 null,
                 null,
+                null,
                 partitionsSpec,
                 null,
                 null,
@@ -2003,6 +2016,7 @@ public class CompactSegmentsTest
                   null,
                   null,
                   null,
+                  null,
                   partitionsSpec,
                   null,
                   null,
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
index 3ad596347a..fd7549c629 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
@@ -120,6 +120,7 @@ public class NewestSegmentFirstIteratorTest
             null,
             null,
             null,
+            null,
             new DynamicPartitionsSpec(null, null),
             null,
             null,
@@ -163,6 +164,7 @@ public class NewestSegmentFirstIteratorTest
             null,
             null,
             null,
+            null,
             new DynamicPartitionsSpec(null, 1000L),
             null,
             null,
@@ -206,6 +208,7 @@ public class NewestSegmentFirstIteratorTest
             null,
             null,
             null,
+            null,
             new DynamicPartitionsSpec(100, 1000L),
             null,
             null,
@@ -245,6 +248,7 @@ public class NewestSegmentFirstIteratorTest
         100,
         null,
         new UserCompactionTaskQueryTuningConfig(
+            null,
             null,
             null,
             1000L,
@@ -292,6 +296,7 @@ public class NewestSegmentFirstIteratorTest
             null,
             null,
             null,
+            null,
             new DynamicPartitionsSpec(null, null),
             null,
             null,
@@ -331,6 +336,7 @@ public class NewestSegmentFirstIteratorTest
         null,
         null,
         new UserCompactionTaskQueryTuningConfig(
+            null,
             null,
             null,
             1000L,
@@ -378,6 +384,7 @@ public class NewestSegmentFirstIteratorTest
             null,
             null,
             null,
+            null,
             new HashedPartitionsSpec(null, 10, ImmutableList.of("dim")),
             null,
             null,
@@ -421,6 +428,7 @@ public class NewestSegmentFirstIteratorTest
             null,
             null,
             null,
+            null,
             new SingleDimensionPartitionsSpec(10000, null, "dim", false),
             null,
             null,
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
index a29bd9eceb..2026873103 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
@@ -45,10 +45,12 @@ import org.apache.druid.query.expression.TestExprMacroTable;
 import org.apache.druid.query.filter.SelectorDimFilter;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.data.ConciseBitmapSerdeFactory;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.transform.TransformSpec;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig;
 import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
 import org.apache.druid.server.coordinator.UserCompactionTaskTransformConfig;
 import org.apache.druid.timeline.CompactionState;
 import org.apache.druid.timeline.DataSegment;
@@ -1490,6 +1492,96 @@ public class NewestSegmentFirstPolicyTest
     Assert.assertFalse(iterator.hasNext());
   }
 
+  @Test
+  public void testIteratorDoesNotReturnSegmentWithChangingAppendableIndexSpec()
+  {
+    NullHandling.initializeForTests();
+    PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
+    final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
+        new SegmentGenerateSpec(
+            Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
+            new Period("P1D"),
+            null,
+            new CompactionState(
+                partitionsSpec,
+                null,
+                null,
+                null,
+                mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {}),
+                null
+            )
+        )
+    );
+
+    CompactionSegmentIterator iterator = policy.reset(
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
+            130000,
+            new Period("P0D"),
+            null,
+            null,
+            null,
+            new UserCompactionTaskQueryTuningConfig(
+                null,
+                new OnheapIncrementalIndex.Spec(true),
+                null,
+                1000L,
+                null,
+                partitionsSpec,
+                new IndexSpec(),
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null
+            ),
+            null
+        )),
+        ImmutableMap.of(DATA_SOURCE, timeline),
+        Collections.emptyMap()
+    );
+    Assert.assertFalse(iterator.hasNext());
+
+    iterator = policy.reset(
+        ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
+            130000,
+            new Period("P0D"),
+            null,
+            null,
+            null,
+            new UserCompactionTaskQueryTuningConfig(
+                null,
+                new OnheapIncrementalIndex.Spec(false),
+                null,
+                1000L,
+                null,
+                partitionsSpec,
+                new IndexSpec(),
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null
+            ),
+            null
+        )),
+        ImmutableMap.of(DATA_SOURCE, timeline),
+        Collections.emptyMap()
+    );
+    Assert.assertFalse(iterator.hasNext());
+  }
+
   private static void assertCompactSegmentIntervals(
       CompactionSegmentIterator iterator,
       Period segmentPeriod,
@@ -1586,7 +1678,19 @@ public class NewestSegmentFirstPolicyTest
       UserCompactionTaskGranularityConfig granularitySpec
   )
   {
-    return createCompactionConfig(inputSegmentSizeBytes, skipOffsetFromLatest, granularitySpec, null, null, null);
+    return createCompactionConfig(inputSegmentSizeBytes, skipOffsetFromLatest, granularitySpec, null, null, null, null);
+  }
+
+  private DataSourceCompactionConfig createCompactionConfig(
+      long inputSegmentSizeBytes,
+      Period skipOffsetFromLatest,
+      UserCompactionTaskGranularityConfig granularitySpec,
+      UserCompactionTaskDimensionsConfig dimensionsSpec,
+      UserCompactionTaskTransformConfig transformSpec,
+      AggregatorFactory[] metricsSpec
+  )
+  {
+    return createCompactionConfig(inputSegmentSizeBytes, skipOffsetFromLatest, granularitySpec, dimensionsSpec, transformSpec, null, metricsSpec);
   }
 
   private DataSourceCompactionConfig createCompactionConfig(
@@ -1595,6 +1699,7 @@ public class NewestSegmentFirstPolicyTest
       UserCompactionTaskGranularityConfig granularitySpec,
       UserCompactionTaskDimensionsConfig dimensionsSpec,
       UserCompactionTaskTransformConfig transformSpec,
+      UserCompactionTaskQueryTuningConfig tuningConfig,
       AggregatorFactory[] metricsSpec
   )
   {
@@ -1604,7 +1709,7 @@ public class NewestSegmentFirstPolicyTest
         inputSegmentSizeBytes,
         null,
         skipOffsetFromLatest,
-        null,
+        tuningConfig,
         granularitySpec,
         dimensionsSpec,
         metricsSpec,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org