You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2020/10/24 01:34:52 UTC

[druid] branch master updated: Configurable Index Type (#10335)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f3a2903  Configurable Index Type (#10335)
f3a2903 is described below

commit f3a2903218573f5d336b082b1c9b8a60a19e8c54
Author: Liran Funaro <li...@gmail.com>
AuthorDate: Sat Oct 24 04:34:26 2020 +0300

    Configurable Index Type (#10335)
    
    * Introduce a Configurable Index Type
    
    * Change to @UnstableApi
    
    * Add AppendableIndexSpecTest
    
    * Update doc
    
    * Add spelling exception
    
    * Add tests coverage
    
    * Revert some of the changes to reduce diff
    
    * Minor fixes
    
    * Update getMaxBytesInMemoryOrDefault() comment
    
    * Fix typo, remove redundant interface
    
    * Remove off-heap spec (postponed to a later PR)
    
    * Add javadocs to AppendableIndexSpec
    
    * Describe testCreateTask()
    
    * Add tests for AppendableIndexSpec within TuningConfig
    
    * Modify hashCode() to conform with equals()
    
    * Add comment where building incremental-index
    
    * Add "EqualsVerifier" tests
    
    * Revert some of the API back to AppenderatorConfig
    
    * Don't use multi-line comments
    
    * Remove knob documentation (deferred)
---
 .../MaterializedViewSupervisorSpec.java            |   1 +
 .../MaterializedViewSupervisorTest.java            |  32 +++++-
 extensions-core/kafka-indexing-service/pom.xml     |   5 +
 .../indexing/kafka/KafkaIndexTaskTuningConfig.java |   4 +
 .../supervisor/KafkaSupervisorTuningConfig.java    |   8 +-
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   |   1 +
 .../kafka/KafkaIndexTaskTuningConfigTest.java      |  21 +++-
 .../kafka/supervisor/KafkaSupervisorTest.java      |   5 +
 .../KafkaSupervisorTuningConfigTest.java           |   6 +-
 .../TestModifiedKafkaIndexTaskTuningConfig.java    |   3 +
 extensions-core/kinesis-indexing-service/pom.xml   |   5 +
 .../kinesis/KinesisIndexTaskTuningConfig.java      |   4 +
 .../supervisor/KinesisSupervisorTuningConfig.java  |   5 +
 .../kinesis/KinesisIndexTaskSerdeTest.java         |   1 +
 .../indexing/kinesis/KinesisIndexTaskTest.java     |   1 +
 .../kinesis/KinesisIndexTaskTuningConfigTest.java  |  21 +++-
 .../kinesis/supervisor/KinesisSupervisorTest.java  |   3 +
 .../KinesisSupervisorTuningConfigTest.java         |   6 +-
 .../TestModifiedKinesisIndexTaskTuningConfig.java  |   4 +
 .../apache/druid/indexer/HadoopTuningConfig.java   |  18 ++-
 .../apache/druid/indexer/IndexGeneratorJob.java    |   8 +-
 .../druid/indexer/BatchDeltaIngestionTest.java     |   1 +
 .../indexer/DetermineHashedPartitionsJobTest.java  |   1 +
 .../druid/indexer/DeterminePartitionsJobTest.java  |   1 +
 .../indexer/HadoopDruidIndexerConfigTest.java      |   1 +
 .../druid/indexer/HadoopTuningConfigTest.java      |   3 +
 .../druid/indexer/IndexGeneratorJobTest.java       |   1 +
 .../org/apache/druid/indexer/JobHelperTest.java    |   1 +
 .../indexer/path/GranularityPathSpecTest.java      |   1 +
 .../index/RealtimeAppenderatorTuningConfig.java    |  19 ++-
 .../druid/indexing/common/task/CompactionTask.java |   1 +
 .../druid/indexing/common/task/IndexTask.java      |  25 +++-
 .../parallel/ParallelIndexSupervisorTask.java      |   1 +
 .../batch/parallel/ParallelIndexTuningConfig.java  |   5 +
 .../SeekableStreamIndexTaskTuningConfig.java       |  19 ++-
 .../AppenderatorDriverRealtimeIndexTaskTest.java   |   1 +
 .../task/ClientCompactionTaskQuerySerdeTest.java   |   1 +
 .../common/task/CompactionTaskRunTest.java         |   1 +
 .../indexing/common/task/CompactionTaskTest.java   |   6 +
 .../indexing/common/task/IndexTaskSerdeTest.java   |   6 +
 .../druid/indexing/common/task/IndexTaskTest.java  |  13 +++
 .../common/task/RealtimeIndexTaskTest.java         |   1 +
 .../druid/indexing/common/task/TaskSerdeTest.java  |   3 +
 .../AbstractParallelIndexSupervisorTaskTest.java   |   2 +
 .../ParallelIndexSupervisorTaskKillTest.java       |   1 +
 .../ParallelIndexSupervisorTaskResourceTest.java   |   1 +
 .../ParallelIndexSupervisorTaskSerdeTest.java      |   1 +
 .../parallel/ParallelIndexSupervisorTaskTest.java  |   1 +
 .../parallel/ParallelIndexTestingFactory.java      |   1 +
 .../parallel/ParallelIndexTuningConfigTest.java    |  16 +++
 .../parallel/SinglePhaseParallelIndexingTest.java  |   1 +
 .../druid/indexing/overlord/TaskLifecycleTest.java |   5 +
 .../SeekableStreamSupervisorStateTest.java         |   1 +
 .../druid/jackson/AppendableIndexModule.java       |  36 +++---
 .../apache/druid/jackson/DefaultObjectMapper.java  |   1 +
 .../druid/query/groupby/GroupByQueryHelper.java    |  31 ++---
 .../incremental/AppendableIndexBuilder.java        | 128 +++++++++++++++++++++
 .../segment/incremental/AppendableIndexSpec.java   |  30 ++---
 .../segment/incremental/IncrementalIndex.java      | 109 ++++--------------
 .../incremental/OffheapIncrementalIndex.java       |  47 ++++++--
 .../incremental/OnheapIncrementalIndex.java        |  49 ++++++++
 .../druid/segment/data/IncrementalIndexTest.java   |  33 +++---
 .../segment/incremental/IncrementalIndexTest.java  |   9 +-
 .../segment/indexing/RealtimeTuningConfig.java     |  19 ++-
 .../druid/segment/indexing/TuningConfig.java       |  37 +++++-
 .../realtime/appenderator/AppenderatorConfig.java  |   8 +-
 .../realtime/appenderator/AppenderatorImpl.java    |   5 +-
 .../UnifiedIndexerAppenderatorsManager.java        |   7 ++
 .../segment/realtime/plumber/RealtimePlumber.java  |   7 +-
 .../druid/segment/realtime/plumber/Sink.java       |  12 +-
 .../segment/indexing/RealtimeTuningConfigTest.java |   6 +-
 .../appenderator/AppenderatorPlumberTest.java      |   1 +
 .../realtime/appenderator/AppenderatorTester.java  |   1 +
 .../DefaultOfflineAppenderatorFactoryTest.java     |   1 +
 .../plumber/RealtimePlumberSchoolTest.java         |  11 +-
 .../druid/segment/realtime/plumber/SinkTest.java   |   9 +-
 .../druid/cli/validate/DruidJsonValidatorTest.java |   1 +
 77 files changed, 687 insertions(+), 215 deletions(-)

diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java
index dd38513..5388388 100644
--- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java
+++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java
@@ -181,6 +181,7 @@ public class MaterializedViewSupervisorSpec implements SupervisorSpec
         tuningConfig.getShardSpecs(),
         tuningConfig.getIndexSpec(),
         tuningConfig.getIndexSpecForIntermediatePersists(),
+        tuningConfig.getAppendableIndexSpec(),
         tuningConfig.getRowFlushBoundary(),
         tuningConfig.getMaxBytesInMemory(),
         tuningConfig.isLeaveIntermediate(),
diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
index 9694c2f..8dc6e53 100644
--- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
+++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
@@ -83,6 +83,7 @@ public class MaterializedViewSupervisorTest
   private TaskQueue taskQueue;
   private MaterializedViewSupervisor supervisor;
   private String derivativeDatasourceName;
+  private MaterializedViewSupervisorSpec spec;
   private final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
 
   @Before
@@ -103,7 +104,7 @@ public class MaterializedViewSupervisorTest
     taskQueue = EasyMock.createMock(TaskQueue.class);
     taskQueue.start();
     objectMapper.registerSubtypes(new NamedType(HashBasedNumberedShardSpec.class, "hashed"));
-    MaterializedViewSupervisorSpec spec = new MaterializedViewSupervisorSpec(
+    spec = new MaterializedViewSupervisorSpec(
         "base",
         new DimensionsSpec(Collections.singletonList(new StringDimensionSchema("dim")), null, null),
         new AggregatorFactory[]{new LongSumAggregatorFactory("m1", "m1")},
@@ -317,6 +318,35 @@ public class MaterializedViewSupervisorTest
 
   }
 
+  /**
+   * Verifies that creating HadoopIndexTask compleates without raising exception.
+   */
+  @Test
+  public void testCreateTask()
+  {
+    List<DataSegment> baseSegments = Collections.singletonList(
+        new DataSegment(
+            "base",
+            Intervals.of("2015-01-02T00Z/2015-01-03T00Z"),
+            "2015-01-03",
+            ImmutableMap.of(),
+            ImmutableList.of("dim1", "dim2"),
+            ImmutableList.of("m1"),
+            new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null, null),
+            9,
+            1024
+        )
+    );
+
+    HadoopIndexTask task = spec.createTask(
+        Intervals.of("2015-01-02T00Z/2015-01-03T00Z"),
+        "2015-01-03",
+        baseSegments
+    );
+
+    Assert.assertNotNull(task);
+  }
+
   @Test
   public void testSuspendedDoesntRun()
   {
diff --git a/extensions-core/kafka-indexing-service/pom.xml b/extensions-core/kafka-indexing-service/pom.xml
index d192b68..7cc1ebf 100644
--- a/extensions-core/kafka-indexing-service/pom.xml
+++ b/extensions-core/kafka-indexing-service/pom.xml
@@ -203,6 +203,11 @@
       <artifactId>assertj-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>nl.jqno.equalsverifier</groupId>
+      <artifactId>equalsverifier</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
 </project>
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
index f23ee7b..8dc9a2f 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
 import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.joda.time.Period;
 
@@ -33,6 +34,7 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon
 {
   @JsonCreator
   public KafkaIndexTaskTuningConfig(
+      @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
       @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
       @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
       @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
@@ -55,6 +57,7 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon
   )
   {
     super(
+        appendableIndexSpec,
         maxRowsInMemory,
         maxBytesInMemory,
         maxRowsPerSegment,
@@ -81,6 +84,7 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon
   public KafkaIndexTaskTuningConfig withBasePersistDirectory(File dir)
   {
     return new KafkaIndexTaskTuningConfig(
+        getAppendableIndexSpec(),
         getMaxRowsInMemory(),
         getMaxBytesInMemory(),
         getMaxRowsPerSegment(),
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
index 2dca7f6..a9e0bfe 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
@@ -23,7 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig;
 import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
 import org.apache.druid.segment.IndexSpec;
-import org.apache.druid.segment.indexing.TuningConfigs;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.joda.time.Duration;
 import org.joda.time.Period;
@@ -67,11 +67,13 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
         null,
         null,
         null,
+        null,
         null
     );
   }
 
   public KafkaSupervisorTuningConfig(
+      @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
       @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
       @JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
       @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
@@ -100,6 +102,7 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
   )
   {
     super(
+        appendableIndexSpec,
         maxRowsInMemory,
         maxBytesInMemory,
         maxRowsPerSegment,
@@ -193,7 +196,7 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
            "maxRowsInMemory=" + getMaxRowsInMemory() +
            ", maxRowsPerSegment=" + getMaxRowsPerSegment() +
            ", maxTotalRows=" + getMaxTotalRows() +
-           ", maxBytesInMemory=" + TuningConfigs.getMaxBytesInMemoryOrDefault(getMaxBytesInMemory()) +
+           ", maxBytesInMemory=" + getMaxBytesInMemoryOrDefault() +
            ", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
            ", basePersistDirectory=" + getBasePersistDirectory() +
            ", maxPendingPersists=" + getMaxPendingPersists() +
@@ -219,6 +222,7 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
   public KafkaIndexTaskTuningConfig convertToTaskTuningConfig()
   {
     return new KafkaIndexTaskTuningConfig(
+        getAppendableIndexSpec(),
         getMaxRowsInMemory(),
         getMaxBytesInMemory(),
         getMaxRowsPerSegment(),
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index f4098fc..9b53735 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -2497,6 +2497,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
   ) throws JsonProcessingException
   {
     final KafkaIndexTaskTuningConfig tuningConfig = new KafkaIndexTaskTuningConfig(
+        null,
         1000,
         null,
         maxRowsPerSegment,
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
index 3a930db..8888a69 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
@@ -21,11 +21,13 @@ package org.apache.druid.indexing.kafka;
 
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
 import org.apache.druid.indexing.kafka.test.TestModifiedKafkaIndexTaskTuningConfig;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.indexing.TuningConfig;
 import org.joda.time.Period;
 import org.junit.Assert;
@@ -60,6 +62,7 @@ public class KafkaIndexTaskTuningConfigTest
     );
 
     Assert.assertNotNull(config.getBasePersistDirectory());
+    Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
     Assert.assertEquals(1000000, config.getMaxRowsInMemory());
     Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
     Assert.assertNull(config.getMaxTotalRows());
@@ -85,7 +88,8 @@ public class KafkaIndexTaskTuningConfigTest
                      + "  \"reportParseExceptions\": true,\n"
                      + "  \"handoffConditionTimeout\": 100,\n"
                      + "  \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n"
-                     + "  \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" }\n"
+                     + "  \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" },\n"
+                     + "  \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n"
                      + "}";
 
     KafkaIndexTaskTuningConfig config = (KafkaIndexTaskTuningConfig) mapper.readValue(
@@ -99,6 +103,7 @@ public class KafkaIndexTaskTuningConfigTest
     );
 
     Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory());
+    Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
     Assert.assertEquals(100, config.getMaxRowsInMemory());
     Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
     Assert.assertNotEquals(null, config.getMaxTotalRows());
@@ -115,6 +120,7 @@ public class KafkaIndexTaskTuningConfigTest
   public void testConvert()
   {
     KafkaSupervisorTuningConfig original = new KafkaSupervisorTuningConfig(
+        null,
         1,
         null,
         2,
@@ -142,6 +148,7 @@ public class KafkaIndexTaskTuningConfigTest
     );
     KafkaIndexTaskTuningConfig copy = (KafkaIndexTaskTuningConfig) original.convertToTaskTuningConfig();
 
+    Assert.assertEquals(original.getAppendableIndexSpec(), copy.getAppendableIndexSpec());
     Assert.assertEquals(1, copy.getMaxRowsInMemory());
     Assert.assertEquals(2, copy.getMaxRowsPerSegment().intValue());
     Assert.assertNotEquals(null, copy.getMaxTotalRows());
@@ -158,6 +165,7 @@ public class KafkaIndexTaskTuningConfigTest
   public void testSerdeWithModifiedTuningConfigAddedField() throws IOException
   {
     KafkaIndexTaskTuningConfig base = new KafkaIndexTaskTuningConfig(
+        null,
         1,
         null,
         2,
@@ -183,6 +191,7 @@ public class KafkaIndexTaskTuningConfigTest
         mapper.readValue(serialized, TestModifiedKafkaIndexTaskTuningConfig.class);
 
     Assert.assertEquals(null, deserialized.getExtra());
+    Assert.assertEquals(base.getAppendableIndexSpec(), deserialized.getAppendableIndexSpec());
     Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory());
     Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory());
     Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
@@ -206,6 +215,7 @@ public class KafkaIndexTaskTuningConfigTest
   public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException
   {
     TestModifiedKafkaIndexTaskTuningConfig base = new TestModifiedKafkaIndexTaskTuningConfig(
+        null,
         1,
         null,
         2,
@@ -231,6 +241,7 @@ public class KafkaIndexTaskTuningConfigTest
     KafkaIndexTaskTuningConfig deserialized =
         mapper.readValue(serialized, KafkaIndexTaskTuningConfig.class);
 
+    Assert.assertEquals(base.getAppendableIndexSpec(), deserialized.getAppendableIndexSpec());
     Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory());
     Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory());
     Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
@@ -249,4 +260,12 @@ public class KafkaIndexTaskTuningConfigTest
     Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions());
     Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions());
   }
+
+  @Test
+  public void testEqualsAndHashCode()
+  {
+    EqualsVerifier.forClass(KafkaIndexTaskTuningConfig.class)
+        .usingGetClass()
+        .verify();
+  }
 }
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 905abd1..251d837 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -270,6 +270,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
             null,
             null,
             null,
+            null,
             null
         ),
         null
@@ -3070,6 +3071,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
         kafkaHost,
         dataSchema,
         new KafkaSupervisorTuningConfig(
+            null,
             1000,
             null,
             50000,
@@ -3109,6 +3111,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     DataSchema modifiedDataSchema = getDataSchema("some other datasource");
 
     KafkaSupervisorTuningConfig modifiedTuningConfig = new KafkaSupervisorTuningConfig(
+        null,
         42, // This is different
         null,
         50000,
@@ -3404,6 +3407,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     };
 
     final KafkaSupervisorTuningConfig tuningConfig = new KafkaSupervisorTuningConfig(
+        null,
         1000,
         null,
         50000,
@@ -3514,6 +3518,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     };
 
     final KafkaSupervisorTuningConfig tuningConfig = new KafkaSupervisorTuningConfig(
+        null,
         1000,
         null,
         50000,
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
index 5859f90..7151eb0 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
@@ -25,6 +25,7 @@ import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.indexing.TuningConfig;
 import org.joda.time.Duration;
 import org.joda.time.Period;
@@ -59,6 +60,7 @@ public class KafkaSupervisorTuningConfigTest
     );
 
     Assert.assertNotNull(config.getBasePersistDirectory());
+    Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
     Assert.assertEquals(1000000, config.getMaxRowsInMemory());
     Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
     Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
@@ -94,7 +96,8 @@ public class KafkaSupervisorTuningConfigTest
                      + "  \"shutdownTimeout\": \"PT95S\",\n"
                      + "  \"offsetFetchPeriod\": \"PT20S\",\n"
                      + "  \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n"
-                     + "  \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" }\n"
+                     + "  \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" },\n"
+                     + "  \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n"
                      + "}";
 
     KafkaSupervisorTuningConfig config = (KafkaSupervisorTuningConfig) mapper.readValue(
@@ -108,6 +111,7 @@ public class KafkaSupervisorTuningConfigTest
     );
 
     Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory());
+    Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
     Assert.assertEquals(100, config.getMaxRowsInMemory());
     Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
     Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod());
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
index 27e69e8..06550e1 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig;
 import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.joda.time.Period;
 
@@ -37,6 +38,7 @@ public class TestModifiedKafkaIndexTaskTuningConfig extends KafkaIndexTaskTuning
 
   @JsonCreator
   public TestModifiedKafkaIndexTaskTuningConfig(
+      @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
       @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
       @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
       @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
@@ -60,6 +62,7 @@ public class TestModifiedKafkaIndexTaskTuningConfig extends KafkaIndexTaskTuning
   )
   {
     super(
+        appendableIndexSpec,
         maxRowsInMemory,
         maxBytesInMemory,
         maxRowsPerSegment,
diff --git a/extensions-core/kinesis-indexing-service/pom.xml b/extensions-core/kinesis-indexing-service/pom.xml
index 6b08d50..0659bed 100644
--- a/extensions-core/kinesis-indexing-service/pom.xml
+++ b/extensions-core/kinesis-indexing-service/pom.xml
@@ -192,6 +192,11 @@
             <artifactId>system-rules</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>nl.jqno.equalsverifier</groupId>
+            <artifactId>equalsverifier</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
index f033a6d..428f54f 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
 import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.joda.time.Period;
 
@@ -50,6 +51,7 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
 
   @JsonCreator
   public KinesisIndexTaskTuningConfig(
+      @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
       @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
       @JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
       @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
@@ -78,6 +80,7 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
   )
   {
     super(
+        appendableIndexSpec,
         maxRowsInMemory,
         maxBytesInMemory,
         maxRowsPerSegment,
@@ -154,6 +157,7 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
   public KinesisIndexTaskTuningConfig withBasePersistDirectory(File dir)
   {
     return new KinesisIndexTaskTuningConfig(
+        getAppendableIndexSpec(),
         getMaxRowsInMemory(),
         getMaxBytesInMemory(),
         getMaxRowsPerSegment(),
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
index 8c1a5fa..7cf49a3 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.indexing.kinesis.KinesisIndexTaskTuningConfig;
 import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
 import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.joda.time.Duration;
 import org.joda.time.Period;
@@ -75,11 +76,13 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
         null,
         null,
         null,
+        null,
         null
     );
   }
 
   public KinesisSupervisorTuningConfig(
+      @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
       @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
       @JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
       @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
@@ -115,6 +118,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
   )
   {
     super(
+        appendableIndexSpec,
         maxRowsInMemory,
         maxBytesInMemory,
         maxRowsPerSegment,
@@ -248,6 +252,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
   public KinesisIndexTaskTuningConfig convertToTaskTuningConfig()
   {
     return new KinesisIndexTaskTuningConfig(
+        getAppendableIndexSpec(),
         getMaxRowsInMemory(),
         getMaxBytesInMemory(),
         getMaxRowsPerSegment(),
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
index 9ab058c..f6b3582 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
@@ -72,6 +72,7 @@ public class KinesisIndexTaskSerdeTest
       null,
       null,
       null,
+      null,
       null
   );
   private static final KinesisIndexTaskIOConfig IO_CONFIG = new KinesisIndexTaskIOConfig(
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 4aff50b..8ff5b43 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -2739,6 +2739,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
     boolean resetOffsetAutomatically = false;
     int maxRowsInMemory = 1000;
     final KinesisIndexTaskTuningConfig tuningConfig = new KinesisIndexTaskTuningConfig(
+        null,
         maxRowsInMemory,
         null,
         maxRowsPerSegment,
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
index 15e56c0..69a70a4 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
@@ -22,10 +22,12 @@ package org.apache.druid.indexing.kinesis;
 import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorTuningConfig;
 import org.apache.druid.indexing.kinesis.test.TestModifiedKinesisIndexTaskTuningConfig;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.indexing.TuningConfig;
 import org.hamcrest.CoreMatchers;
 import org.joda.time.Period;
@@ -66,6 +68,7 @@ public class KinesisIndexTaskTuningConfigTest
     );
 
     Assert.assertNotNull(config.getBasePersistDirectory());
+    Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
     Assert.assertEquals(1000000, config.getMaxRowsInMemory());
     Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
     Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
@@ -102,7 +105,8 @@ public class KinesisIndexTaskTuningConfigTest
                      + "  \"fetchSequenceNumberTimeout\": 6000,\n"
                      + "  \"resetOffsetAutomatically\": false,\n"
                      + "  \"skipSequenceNumberAvailabilityCheck\": true,\n"
-                     + "  \"fetchThreads\": 2\n"
+                     + "  \"fetchThreads\": 2,\n"
+                     + "  \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n"
                      + "}";
 
     KinesisIndexTaskTuningConfig config = (KinesisIndexTaskTuningConfig) mapper.readValue(
@@ -116,6 +120,7 @@ public class KinesisIndexTaskTuningConfigTest
     );
 
     Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory());
+    Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
     Assert.assertEquals(100, config.getMaxRowsInMemory());
     Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
     Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod());
@@ -136,6 +141,7 @@ public class KinesisIndexTaskTuningConfigTest
   public void testSerdeWithModifiedTuningConfigAddedField() throws IOException
   {
     KinesisIndexTaskTuningConfig base = new KinesisIndexTaskTuningConfig(
+        null,
         1,
         3L,
         2,
@@ -168,6 +174,7 @@ public class KinesisIndexTaskTuningConfigTest
         mapper.readValue(serialized, TestModifiedKinesisIndexTaskTuningConfig.class);
 
     Assert.assertEquals(null, deserialized.getExtra());
+    Assert.assertEquals(base.getAppendableIndexSpec(), deserialized.getAppendableIndexSpec());
     Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory());
     Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory());
     Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
@@ -195,6 +202,7 @@ public class KinesisIndexTaskTuningConfigTest
   public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException
   {
     KinesisIndexTaskTuningConfig base = new KinesisIndexTaskTuningConfig(
+        null,
         1,
         3L,
         2,
@@ -226,6 +234,7 @@ public class KinesisIndexTaskTuningConfigTest
     KinesisIndexTaskTuningConfig deserialized =
         mapper.readValue(serialized, KinesisIndexTaskTuningConfig.class);
 
+    Assert.assertEquals(base.getAppendableIndexSpec(), deserialized.getAppendableIndexSpec());
     Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory());
     Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory());
     Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
@@ -282,6 +291,7 @@ public class KinesisIndexTaskTuningConfigTest
   public void testConvert()
   {
     KinesisSupervisorTuningConfig original = new KinesisSupervisorTuningConfig(
+        null,
         1,
         (long) 3,
         2,
@@ -317,6 +327,7 @@ public class KinesisIndexTaskTuningConfigTest
     );
     KinesisIndexTaskTuningConfig copy = (KinesisIndexTaskTuningConfig) original.convertToTaskTuningConfig();
 
+    Assert.assertEquals(original.getAppendableIndexSpec(), copy.getAppendableIndexSpec());
     Assert.assertEquals(1, copy.getMaxRowsInMemory());
     Assert.assertEquals(3, copy.getMaxBytesInMemory());
     Assert.assertEquals(2, copy.getMaxRowsPerSegment().intValue());
@@ -338,4 +349,12 @@ public class KinesisIndexTaskTuningConfigTest
     Assert.assertEquals(100, copy.getMaxRecordsPerPoll());
     Assert.assertEquals(new Period().withDays(Integer.MAX_VALUE), copy.getIntermediateHandoffPeriod());
   }
+
+  @Test
+  public void testEqualsAndHashCode()
+  {
+    EqualsVerifier.forClass(KinesisIndexTaskTuningConfig.class)
+        .usingGetClass()
+        .verify();
+  }
 }
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 1351662..3d57a6b 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -167,6 +167,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
     supervisorRecordSupplier = createMock(KinesisRecordSupplier.class);
 
     tuningConfig = new KinesisSupervisorTuningConfig(
+        null,
         1000,
         null,
         50000,
@@ -3689,6 +3690,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
     DataSchema modifiedDataSchema = getDataSchema("some other datasource");
 
     KinesisSupervisorTuningConfig modifiedTuningConfig = new KinesisSupervisorTuningConfig(
+        null,
         1000,
         null,
         50000,
@@ -4741,6 +4743,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
     };
 
     final KinesisSupervisorTuningConfig tuningConfig = new KinesisSupervisorTuningConfig(
+        null,
         1000,
         null,
         50000,
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
index d503210..ff3edc4 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.indexing.kinesis.KinesisIndexingServiceModule;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.indexing.TuningConfig;
 import org.joda.time.Duration;
 import org.joda.time.Period;
@@ -58,6 +59,7 @@ public class KinesisSupervisorTuningConfigTest
     );
 
     Assert.assertNotNull(config.getBasePersistDirectory());
+    Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
     Assert.assertEquals(1000000, config.getMaxRowsInMemory());
     Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
     Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
@@ -92,7 +94,8 @@ public class KinesisSupervisorTuningConfigTest
                      + "  \"chatRetries\": 14,\n"
                      + "  \"httpTimeout\": \"PT15S\",\n"
                      + "  \"shutdownTimeout\": \"PT95S\",\n"
-                     + "  \"repartitionTransitionDuration\": \"PT500S\"\n"
+                     + "  \"repartitionTransitionDuration\": \"PT500S\",\n"
+                     + "  \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n"
                      + "}";
 
     KinesisSupervisorTuningConfig config = (KinesisSupervisorTuningConfig) mapper.readValue(
@@ -106,6 +109,7 @@ public class KinesisSupervisorTuningConfigTest
     );
 
     Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory());
+    Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
     Assert.assertEquals(100, config.getMaxRowsInMemory());
     Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
     Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod());
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
index e45168d..5b2e2bd 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.druid.indexing.kinesis.KinesisIndexTaskTuningConfig;
 import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.joda.time.Period;
 
@@ -37,6 +38,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu
 
   @JsonCreator
   public TestModifiedKinesisIndexTaskTuningConfig(
+      @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
       @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
       @JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
       @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
@@ -66,6 +68,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu
   )
   {
     super(
+        appendableIndexSpec,
         maxRowsInMemory,
         maxBytesInMemory,
         maxRowsPerSegment,
@@ -98,6 +101,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu
   public TestModifiedKinesisIndexTaskTuningConfig(KinesisIndexTaskTuningConfig base, String extra)
   {
     super(
+        base.getAppendableIndexSpec(),
         base.getMaxRowsInMemory(),
         base.getMaxBytesInMemory(),
         base.getMaxRowsPerSegment(),
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java
index 2b80d28..f1a8cc8 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java
@@ -29,6 +29,7 @@ import org.apache.druid.indexer.partitions.DimensionBasedPartitionsSpec;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.indexing.TuningConfig;
 
 import javax.annotation.Nullable;
@@ -56,6 +57,7 @@ public class HadoopTuningConfig implements TuningConfig
         DEFAULT_SHARD_SPECS,
         DEFAULT_INDEX_SPEC,
         DEFAULT_INDEX_SPEC,
+        DEFAULT_APPENDABLE_INDEX,
         DEFAULT_ROW_FLUSH_BOUNDARY,
         0L,
         false,
@@ -83,6 +85,7 @@ public class HadoopTuningConfig implements TuningConfig
   private final Map<Long, List<HadoopyShardSpec>> shardSpecs;
   private final IndexSpec indexSpec;
   private final IndexSpec indexSpecForIntermediatePersists;
+  private final AppendableIndexSpec appendableIndexSpec;
   private final int rowFlushBoundary;
   private final long maxBytesInMemory;
   private final boolean leaveIntermediate;
@@ -108,6 +111,7 @@ public class HadoopTuningConfig implements TuningConfig
       final @JsonProperty("shardSpecs") @Nullable Map<Long, List<HadoopyShardSpec>> shardSpecs,
       final @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
       final @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
+      final @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
       final @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
       final @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
       final @JsonProperty("leaveIntermediate") boolean leaveIntermediate,
@@ -140,8 +144,9 @@ public class HadoopTuningConfig implements TuningConfig
     this.rowFlushBoundary = maxRowsInMemory == null ? maxRowsInMemoryCOMPAT == null
                                                       ? DEFAULT_ROW_FLUSH_BOUNDARY
                                                       : maxRowsInMemoryCOMPAT : maxRowsInMemory;
+    this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
     // initializing this to 0, it will be lazily initialized to a value
-    // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
+    // @see #getMaxBytesInMemoryOrDefault()
     this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
     this.leaveIntermediate = leaveIntermediate;
     this.cleanupOnFailure = cleanupOnFailure == null ? true : cleanupOnFailure;
@@ -211,6 +216,13 @@ public class HadoopTuningConfig implements TuningConfig
     return indexSpecForIntermediatePersists;
   }
 
+  @JsonProperty
+  @Override
+  public AppendableIndexSpec getAppendableIndexSpec()
+  {
+    return appendableIndexSpec;
+  }
+
   @JsonProperty("maxRowsInMemory")
   public int getRowFlushBoundary()
   {
@@ -218,6 +230,7 @@ public class HadoopTuningConfig implements TuningConfig
   }
 
   @JsonProperty
+  @Override
   public long getMaxBytesInMemory()
   {
     return maxBytesInMemory;
@@ -327,6 +340,7 @@ public class HadoopTuningConfig implements TuningConfig
         shardSpecs,
         indexSpec,
         indexSpecForIntermediatePersists,
+        appendableIndexSpec,
         rowFlushBoundary,
         maxBytesInMemory,
         leaveIntermediate,
@@ -357,6 +371,7 @@ public class HadoopTuningConfig implements TuningConfig
         shardSpecs,
         indexSpec,
         indexSpecForIntermediatePersists,
+        appendableIndexSpec,
         rowFlushBoundary,
         maxBytesInMemory,
         leaveIntermediate,
@@ -387,6 +402,7 @@ public class HadoopTuningConfig implements TuningConfig
         specs,
         indexSpec,
         indexSpecForIntermediatePersists,
+        appendableIndexSpec,
         rowFlushBoundary,
         maxBytesInMemory,
         leaveIntermediate,
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
index 91982fd..abc0b3b 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
@@ -50,7 +50,6 @@ import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
-import org.apache.druid.segment.indexing.TuningConfigs;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.apache.druid.timeline.partition.ShardSpec;
@@ -302,11 +301,12 @@ public class IndexGeneratorJob implements Jobby
         .withRollup(config.getSchema().getDataSchema().getGranularitySpec().isRollup())
         .build();
 
-    IncrementalIndex newIndex = new IncrementalIndex.Builder()
+    // Build the incremental-index according to the spec that was chosen by the user
+    IncrementalIndex newIndex = tuningConfig.getAppendableIndexSpec().builder()
         .setIndexSchema(indexSchema)
         .setMaxRowCount(tuningConfig.getRowFlushBoundary())
-        .setMaxBytesInMemory(TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()))
-        .buildOnheap();
+        .setMaxBytesInMemory(tuningConfig.getMaxBytesInMemoryOrDefault())
+        .build();
 
     if (oldDimOrder != null && !indexSchema.getDimensionsSpec().hasCustomDimensions()) {
       newIndex.loadDimensionIterable(oldDimOrder, oldCapabilities);
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java
index 4b8d72e..644ae24 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java
@@ -465,6 +465,7 @@ public class BatchDeltaIngestionTest
                 null,
                 null,
                 null,
+                null,
                 false,
                 false,
                 false,
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java
index 93eda94..a127d51 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java
@@ -219,6 +219,7 @@ public class DetermineHashedPartitionsJobTest
             null,
             null,
             null,
+            null,
             false,
             false,
             false,
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java
index 6906eaf..047c104 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java
@@ -328,6 +328,7 @@ public class DeterminePartitionsJobTest
                 null,
                 null,
                 null,
+                null,
                 false,
                 false,
                 false,
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java
index c9937e0..9eabd41 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java
@@ -241,6 +241,7 @@ public class HadoopDruidIndexerConfigTest
           null,
           null,
           null,
+          null,
           false,
           false,
           false,
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java
index c1a7e95..fce828b 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -44,6 +45,7 @@ public class HadoopTuningConfigTest
         null,
         null,
         null,
+        null,
         100,
         null,
         true,
@@ -68,6 +70,7 @@ public class HadoopTuningConfigTest
 
     Assert.assertEquals("/tmp/workingpath", actual.getWorkingPath());
     Assert.assertEquals("version", actual.getVersion());
+    Assert.assertEquals(new OnheapIncrementalIndex.Spec(), actual.getAppendableIndexSpec());
     Assert.assertNotNull(actual.getPartitionsSpec());
     Assert.assertEquals(ImmutableMap.<Long, List<HadoopyShardSpec>>of(), actual.getShardSpecs());
     Assert.assertEquals(new IndexSpec(), actual.getIndexSpec());
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java
index cd93236..8d59554 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java
@@ -529,6 +529,7 @@ public class IndexGeneratorJobTest
                 null,
                 null,
                 null,
+                null,
                 maxRowsInMemory,
                 maxBytesInMemory,
                 true,
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java
index 1cdb9b2..3188762 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java
@@ -165,6 +165,7 @@ public class JobHelperTest
                 null,
                 null,
                 null,
+                null,
                 false,
                 false,
                 false,
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java
index 4da9ee4..68505be 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java
@@ -63,6 +63,7 @@ public class GranularityPathSpecTest
       null,
       null,
       null,
+      null,
       false,
       false,
       false,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
index eec9b98..1538e15 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
@@ -26,6 +26,7 @@ import com.google.common.base.Preconditions;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.indexing.TuningConfig;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
@@ -37,9 +38,8 @@ import javax.annotation.Nullable;
 import java.io.File;
 
 @JsonTypeName("realtime_appenderator")
-public class RealtimeAppenderatorTuningConfig implements TuningConfig, AppenderatorConfig
+public class RealtimeAppenderatorTuningConfig implements AppenderatorConfig
 {
-  private static final int DEFAULT_MAX_ROWS_IN_MEMORY = TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY;
   private static final Period DEFAULT_INTERMEDIATE_PERSIST_PERIOD = new Period("PT10M");
   private static final int DEFAULT_MAX_PENDING_PERSISTS = 0;
   private static final ShardSpec DEFAULT_SHARD_SPEC = new NumberedShardSpec(0, 1);
@@ -53,6 +53,7 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
     return FileUtils.createTempDir("druid-realtime-persist");
   }
 
+  private final AppendableIndexSpec appendableIndexSpec;
   private final int maxRowsInMemory;
   private final long maxBytesInMemory;
   private final DynamicPartitionsSpec partitionsSpec;
@@ -74,6 +75,7 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
 
   @JsonCreator
   public RealtimeAppenderatorTuningConfig(
+      @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
       @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
       @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
       @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
@@ -93,9 +95,10 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
       @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions
   )
   {
+    this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
     this.maxRowsInMemory = maxRowsInMemory == null ? DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
-    // initializing this to 0, it will be lazily intialized to a value
-    // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
+    // initializing this to 0, it will be lazily initialized to a value
+    // @see #getMaxBytesInMemoryOrDefault()
     this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
     this.partitionsSpec = new DynamicPartitionsSpec(maxRowsPerSegment, maxTotalRows);
     this.intermediatePersistPeriod = intermediatePersistPeriod == null
@@ -137,6 +140,13 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
 
   @Override
   @JsonProperty
+  public AppendableIndexSpec getAppendableIndexSpec()
+  {
+    return appendableIndexSpec;
+  }
+
+  @Override
+  @JsonProperty
   public int getMaxRowsInMemory()
   {
     return maxRowsInMemory;
@@ -260,6 +270,7 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
   public RealtimeAppenderatorTuningConfig withBasePersistDirectory(File dir)
   {
     return new RealtimeAppenderatorTuningConfig(
+        appendableIndexSpec,
         maxRowsInMemory,
         maxBytesInMemory,
         partitionsSpec.getMaxRowsPerSegment(),
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index ba2502f..afaf0cf 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -219,6 +219,7 @@ public class CompactionTask extends AbstractBatchIndexTask
       return new ParallelIndexTuningConfig(
           null,
           indexTuningConfig.getMaxRowsPerSegment(),
+          indexTuningConfig.getAppendableIndexSpec(),
           indexTuningConfig.getMaxRowsPerSegment(),
           indexTuningConfig.getMaxBytesInMemory(),
           indexTuningConfig.getMaxTotalRows(),
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 16e03f1..f351a5d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -74,6 +74,7 @@ import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.incremental.ParseExceptionHandler;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.BatchIOConfig;
@@ -1110,7 +1111,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
     }
   }
 
-  public static class IndexTuningConfig implements TuningConfig, AppenderatorConfig
+  public static class IndexTuningConfig implements AppenderatorConfig
   {
     private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
     private static final int DEFAULT_MAX_PENDING_PERSISTS = 0;
@@ -1118,6 +1119,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
     private static final boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = false;
     private static final long DEFAULT_PUSH_TIMEOUT = 0;
 
+    private final AppendableIndexSpec appendableIndexSpec;
     private final int maxRowsInMemory;
     private final long maxBytesInMemory;
 
@@ -1189,6 +1191,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
     public IndexTuningConfig(
         @JsonProperty("targetPartitionSize") @Deprecated @Nullable Integer targetPartitionSize,
         @JsonProperty("maxRowsPerSegment") @Deprecated @Nullable Integer maxRowsPerSegment,
+        @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
         @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
         @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
         @JsonProperty("maxTotalRows") @Deprecated @Nullable Long maxTotalRows,
@@ -1211,6 +1214,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
     )
     {
       this(
+          appendableIndexSpec,
           maxRowsInMemory != null ? maxRowsInMemory : rowFlushBoundary_forBackCompatibility,
           maxBytesInMemory != null ? maxBytesInMemory : 0,
           getPartitionsSpec(
@@ -1242,10 +1246,11 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
 
     private IndexTuningConfig()
     {
-      this(null, null, null, null, null, null, null, null, null, null, null, null, null, null);
+      this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
     }
 
     private IndexTuningConfig(
+        @Nullable AppendableIndexSpec appendableIndexSpec,
         @Nullable Integer maxRowsInMemory,
         @Nullable Long maxBytesInMemory,
         @Nullable PartitionsSpec partitionsSpec,
@@ -1262,9 +1267,10 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
         @Nullable Integer maxSavedParseExceptions
     )
     {
+      this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
       this.maxRowsInMemory = maxRowsInMemory == null ? TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
       // initializing this to 0, it will be lazily initialized to a value
-      // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
+      // @see #getMaxBytesInMemoryOrDefault()
       this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
       this.partitionsSpec = partitionsSpec;
       this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
@@ -1300,6 +1306,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
     public IndexTuningConfig withBasePersistDirectory(File dir)
     {
       return new IndexTuningConfig(
+          appendableIndexSpec,
           maxRowsInMemory,
           maxBytesInMemory,
           partitionsSpec,
@@ -1320,6 +1327,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
     public IndexTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec)
     {
       return new IndexTuningConfig(
+          appendableIndexSpec,
           maxRowsInMemory,
           maxBytesInMemory,
           partitionsSpec,
@@ -1339,6 +1347,13 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
 
     @JsonProperty
     @Override
+    public AppendableIndexSpec getAppendableIndexSpec()
+    {
+      return appendableIndexSpec;
+    }
+
+    @JsonProperty
+    @Override
     public int getMaxRowsInMemory()
     {
       return maxRowsInMemory;
@@ -1514,7 +1529,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
         return false;
       }
       IndexTuningConfig that = (IndexTuningConfig) o;
-      return maxRowsInMemory == that.maxRowsInMemory &&
+      return Objects.equals(appendableIndexSpec, that.appendableIndexSpec) &&
+             maxRowsInMemory == that.maxRowsInMemory &&
              maxBytesInMemory == that.maxBytesInMemory &&
              maxPendingPersists == that.maxPendingPersists &&
              forceGuaranteedRollup == that.forceGuaranteedRollup &&
@@ -1534,6 +1550,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
     public int hashCode()
     {
       return Objects.hash(
+          appendableIndexSpec,
           maxRowsInMemory,
           maxBytesInMemory,
           partitionsSpec,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index d5d8a8f..5cbfc01 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -929,6 +929,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
     return new IndexTuningConfig(
         null,
         null,
+        tuningConfig.getAppendableIndexSpec(),
         tuningConfig.getMaxRowsInMemory(),
         tuningConfig.getMaxBytesInMemory(),
         null,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
index 935eeb4..ca7fdf1 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
@@ -28,6 +28,7 @@ import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
 import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.joda.time.Duration;
 import org.joda.time.Period;
@@ -97,6 +98,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
         null,
         null,
         null,
+        null,
         null
     );
   }
@@ -105,6 +107,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
   public ParallelIndexTuningConfig(
       @JsonProperty("targetPartitionSize") @Deprecated @Nullable Integer targetPartitionSize,
       @JsonProperty("maxRowsPerSegment") @Deprecated @Nullable Integer maxRowsPerSegment,
+      @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
       @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
       @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
       @JsonProperty("maxTotalRows") @Deprecated @Nullable Long maxTotalRows,
@@ -134,6 +137,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
     super(
         targetPartitionSize,
         maxRowsPerSegment,
+        appendableIndexSpec,
         maxRowsInMemory,
         maxBytesInMemory,
         maxTotalRows,
@@ -248,6 +252,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
     return new ParallelIndexTuningConfig(
         null,
         null,
+        getAppendableIndexSpec(),
         getMaxRowsInMemory(),
         getMaxBytesInMemory(),
         null,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
index c192703..b56d8e5 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.seekablestream;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.indexing.RealtimeTuningConfig;
 import org.apache.druid.segment.indexing.TuningConfig;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
@@ -32,11 +33,12 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.util.Objects;
 
-public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfig, AppenderatorConfig
+public abstract class SeekableStreamIndexTaskTuningConfig implements AppenderatorConfig
 {
   private static final boolean DEFAULT_RESET_OFFSET_AUTOMATICALLY = false;
   private static final boolean DEFAULT_SKIP_SEQUENCE_NUMBER_AVAILABILITY_CHECK = false;
 
+  private final AppendableIndexSpec appendableIndexSpec;
   private final int maxRowsInMemory;
   private final long maxBytesInMemory;
   private final DynamicPartitionsSpec partitionsSpec;
@@ -59,6 +61,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi
   private final int maxSavedParseExceptions;
 
   public SeekableStreamIndexTaskTuningConfig(
+      @Nullable AppendableIndexSpec appendableIndexSpec,
       @Nullable Integer maxRowsInMemory,
       @Nullable Long maxBytesInMemory,
       @Nullable Integer maxRowsPerSegment,
@@ -84,10 +87,11 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi
     // Cannot be a static because default basePersistDirectory is unique per-instance
     final RealtimeTuningConfig defaults = RealtimeTuningConfig.makeDefaultTuningConfig(basePersistDirectory);
 
+    this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
     this.maxRowsInMemory = maxRowsInMemory == null ? defaults.getMaxRowsInMemory() : maxRowsInMemory;
     this.partitionsSpec = new DynamicPartitionsSpec(maxRowsPerSegment, maxTotalRows);
     // initializing this to 0, it will be lazily initialized to a value
-    // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
+    // @see #getMaxBytesInMemoryOrDefault()
     this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
     this.intermediatePersistPeriod = intermediatePersistPeriod == null
                                      ? defaults.getIntermediatePersistPeriod()
@@ -132,6 +136,13 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi
 
   @Override
   @JsonProperty
+  public AppendableIndexSpec getAppendableIndexSpec()
+  {
+    return appendableIndexSpec;
+  }
+
+  @Override
+  @JsonProperty
   public int getMaxRowsInMemory()
   {
     return maxRowsInMemory;
@@ -281,7 +292,8 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi
       return false;
     }
     SeekableStreamIndexTaskTuningConfig that = (SeekableStreamIndexTaskTuningConfig) o;
-    return maxRowsInMemory == that.maxRowsInMemory &&
+    return Objects.equals(appendableIndexSpec, that.appendableIndexSpec) &&
+           maxRowsInMemory == that.maxRowsInMemory &&
            maxBytesInMemory == that.maxBytesInMemory &&
            maxPendingPersists == that.maxPendingPersists &&
            reportParseExceptions == that.reportParseExceptions &&
@@ -304,6 +316,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi
   public int hashCode()
   {
     return Objects.hash(
+        appendableIndexSpec,
         maxRowsInMemory,
         maxBytesInMemory,
         partitionsSpec,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index a9d82b1..9a8ae20 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -1394,6 +1394,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand
         null
     );
     RealtimeAppenderatorTuningConfig tuningConfig = new RealtimeAppenderatorTuningConfig(
+        null,
         1000,
         null,
         maxRowsPerSegment,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
index 0692c60..0daffc6 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
@@ -204,6 +204,7 @@ public class ClientCompactionTaskQuerySerdeTest
             new ParallelIndexTuningConfig(
                 null,
                 null,
+                null,
                 40000,
                 2000L,
                 null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index 08cd765..1fce094 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -279,6 +279,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
                 null,
                 null,
                 null,
+                null,
                 new HashedPartitionsSpec(null, 3, null),
                 null,
                 null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index c82f1e7..14c0840 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -305,6 +305,7 @@ public class CompactionTaskTest
     return new ParallelIndexTuningConfig(
         null,
         null, // null to compute maxRowsPerSegment automatically
+        null,
         500000,
         1000000L,
         null,
@@ -438,6 +439,7 @@ public class CompactionTaskTest
         new IndexTuningConfig(
             null,
             null, // null to compute maxRowsPerSegment automatically
+            null,
             500000,
             1000000L,
             null,
@@ -583,6 +585,7 @@ public class CompactionTaskTest
     final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
         100000,
         null,
+        null,
         500000,
         1000000L,
         null,
@@ -650,6 +653,7 @@ public class CompactionTaskTest
     final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
         null,
         null,
+        null,
         500000,
         1000000L,
         1000000L,
@@ -717,6 +721,7 @@ public class CompactionTaskTest
     final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
         null,
         null,
+        null,
         500000,
         1000000L,
         null,
@@ -1105,6 +1110,7 @@ public class CompactionTaskTest
         new ParallelIndexTuningConfig(
             null,
             null,
+            null,
             500000,
             1000000L,
             Long.MAX_VALUE,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java
index 23f5748..bf2e444 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java
@@ -59,6 +59,7 @@ public class IndexTaskSerdeTest
     final IndexTuningConfig tuningConfig = new IndexTuningConfig(
         null,
         null,
+        null,
         100,
         2000L,
         null,
@@ -92,6 +93,7 @@ public class IndexTaskSerdeTest
     final IndexTuningConfig tuningConfig = new IndexTuningConfig(
         null,
         null,
+        null,
         100,
         2000L,
         null,
@@ -125,6 +127,7 @@ public class IndexTaskSerdeTest
     final IndexTuningConfig tuningConfig = new IndexTuningConfig(
         null,
         1000,
+        null,
         100,
         2000L,
         3000L,
@@ -158,6 +161,7 @@ public class IndexTaskSerdeTest
     final IndexTuningConfig tuningConfig = new IndexTuningConfig(
         null,
         null,
+        null,
         100,
         2000L,
         null,
@@ -193,6 +197,7 @@ public class IndexTaskSerdeTest
     final IndexTuningConfig tuningConfig = new IndexTuningConfig(
         null,
         null,
+        null,
         100,
         2000L,
         null,
@@ -227,6 +232,7 @@ public class IndexTaskSerdeTest
     final IndexTuningConfig tuningConfig = new IndexTuningConfig(
         null,
         null,
+        null,
         100,
         2000L,
         null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index fdd3096..0d4810a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
 import com.google.common.io.Files;
+import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.impl.CSVParseSpec;
 import org.apache.druid.data.input.impl.CsvInputFormat;
@@ -1122,6 +1123,7 @@ public class IndexTaskTest extends IngestionTestBase
         null,
         null,
         null,
+        null,
         new HashedPartitionsSpec(2, null, null),
         INDEX_SPEC,
         null,
@@ -1250,6 +1252,7 @@ public class IndexTaskTest extends IngestionTestBase
         null,
         null,
         null,
+        null,
         new DynamicPartitionsSpec(2, null),
         INDEX_SPEC,
         null,
@@ -1370,6 +1373,7 @@ public class IndexTaskTest extends IngestionTestBase
         null,
         null,
         null,
+        null,
         new HashedPartitionsSpec(2, null, null),
         INDEX_SPEC,
         null,
@@ -1808,6 +1812,7 @@ public class IndexTaskTest extends IngestionTestBase
     return new IndexTuningConfig(
         null,
         maxRowsPerSegment,
+        null,
         maxRowsInMemory,
         maxBytesInMemory,
         maxTotalRows,
@@ -1998,4 +2003,12 @@ public class IndexTaskTest extends IngestionTestBase
       );
     }
   }
+
+  @Test
+  public void testEqualsAndHashCode()
+  {
+    EqualsVerifier.forClass(IndexTuningConfig.class)
+        .usingGetClass()
+        .verify();
+  }
 }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
index b87dcf7..2038441 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
@@ -830,6 +830,7 @@ public class RealtimeIndexTaskTest extends InitializedNullHandlingTest
         null
     );
     RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(
+        null,
         1000,
         null,
         new Period("P1Y"),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
index 9dfbd4e..aa3d33b 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
@@ -249,6 +249,7 @@ public class TaskSerdeTest
             new IndexTuningConfig(
                 null,
                 null,
+                null,
                 10,
                 null,
                 null,
@@ -329,6 +330,7 @@ public class TaskSerdeTest
             new IndexTuningConfig(
                 null,
                 null,
+                null,
                 10,
                 null,
                 null,
@@ -396,6 +398,7 @@ public class TaskSerdeTest
             ),
 
             new RealtimeTuningConfig(
+                null,
                 1,
                 10L,
                 new Period("PT10M"),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index d87d1f0..45b0f38 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -157,6 +157,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
           null,
           null,
           null,
+          null,
           2,
           null,
           null,
@@ -227,6 +228,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
         null,
         null,
         null,
+        null,
         new MaxSizeSplitHintSpec(null, 1),
         partitionsSpec,
         null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
index 37fb265..fc193b0 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
@@ -178,6 +178,7 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSu
             null,
             null,
             null,
+            null,
             numTotalSubTasks,
             null,
             null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
index 0a30bf7..be6b0a8 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
@@ -427,6 +427,7 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd
             null,
             null,
             null,
+            null,
             NUM_SUB_TASKS,
             null,
             null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
index e6b144f..a80e3a8 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
@@ -271,6 +271,7 @@ public class ParallelIndexSupervisorTaskSerdeTest
           null,
           null,
           null,
+          null,
           partitionsSpec,
           null,
           null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
index 710df1d..6088235 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
@@ -195,6 +195,7 @@ public class ParallelIndexSupervisorTaskTest
       final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
           null,
           null,
+          null,
           10,
           1000L,
           null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java
index 931fd24..c2f7123 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java
@@ -158,6 +158,7 @@ class ParallelIndexTestingFactory
       return new ParallelIndexTuningConfig(
           1,
           null,
+          null,
           3,
           4L,
           5L,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java
index 101c974..a826ae9 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.common.task.batch.parallel;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.jsontype.NamedType;
+import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
@@ -69,6 +70,7 @@ public class ParallelIndexTuningConfigTest
     final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
         null,
         null,
+        null,
         10,
         1000L,
         null,
@@ -111,6 +113,7 @@ public class ParallelIndexTuningConfigTest
     final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
         null,
         null,
+        null,
         10,
         1000L,
         null,
@@ -153,6 +156,7 @@ public class ParallelIndexTuningConfigTest
     final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
         null,
         null,
+        null,
         10,
         1000L,
         null,
@@ -197,6 +201,7 @@ public class ParallelIndexTuningConfigTest
     final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
         null,
         null,
+        null,
         10,
         1000L,
         null,
@@ -238,6 +243,7 @@ public class ParallelIndexTuningConfigTest
     new ParallelIndexTuningConfig(
         null,
         null,
+        null,
         10,
         1000L,
         null,
@@ -279,6 +285,7 @@ public class ParallelIndexTuningConfigTest
     new ParallelIndexTuningConfig(
         null,
         null,
+        null,
         10,
         1000L,
         null,
@@ -320,6 +327,7 @@ public class ParallelIndexTuningConfigTest
     new ParallelIndexTuningConfig(
         null,
         null,
+        null,
         10,
         1000L,
         null,
@@ -351,4 +359,12 @@ public class ParallelIndexTuningConfigTest
         null
     );
   }
+
+  @Test
+  public void testEqualsAndHashCode()
+  {
+    EqualsVerifier.forClass(ParallelIndexTuningConfig.class)
+        .usingGetClass()
+        .verify();
+  }
 }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
index b7f107d..e444f80 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
@@ -352,6 +352,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
             null,
             null,
             null,
+            null,
             1,
             null,
             null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index 99ab9a4..48f75d7 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -746,6 +746,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
             new IndexTuningConfig(
                 null,
                 10000,
+                null,
                 10,
                 null,
                 null,
@@ -826,6 +827,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
             new IndexTuningConfig(
                 null,
                 10000,
+                null,
                 10,
                 null,
                 null,
@@ -1251,6 +1253,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
             new IndexTuningConfig(
                 null,
                 10000,
+                null,
                 10,
                 null,
                 null,
@@ -1358,6 +1361,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
             new IndexTuningConfig(
                 null,
                 10000,
+                null,
                 10,
                 null,
                 null,
@@ -1468,6 +1472,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
         // PlumberSchool - Realtime Index Task always uses RealtimePlumber which is hardcoded in RealtimeIndexTask class
     );
     RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(
+        null,
         1000,
         null,
         new Period("P1Y"),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 6bfbd7f..2cefebd 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -898,6 +898,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
             null,
             null,
             null,
+            null,
             null
         )
         {
diff --git a/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java b/processing/src/main/java/org/apache/druid/jackson/AppendableIndexModule.java
similarity index 53%
copy from server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
copy to processing/src/main/java/org/apache/druid/jackson/AppendableIndexModule.java
index 6d02ca4..177c7a5 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
+++ b/processing/src/main/java/org/apache/druid/jackson/AppendableIndexModule.java
@@ -17,26 +17,28 @@
  * under the License.
  */
 
-package org.apache.druid.segment.indexing;
+package org.apache.druid.jackson;
 
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import org.apache.druid.utils.JvmUtils;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 
-/**
- */
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
-@JsonSubTypes(value = {
-    @JsonSubTypes.Type(name = "realtime", value = RealtimeTuningConfig.class)
-})
-public interface TuningConfig
+public class AppendableIndexModule extends SimpleModule
 {
-  boolean DEFAULT_LOG_PARSE_EXCEPTIONS = false;
-  int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE;
-  int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0;
-  int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000;
-  // We initially estimated this to be 1/3(max jvm memory), but bytesCurrentlyInMemory only
-  // tracks active index and not the index being flushed to disk, to account for that
-  // we halved default to 1/6(max jvm memory)
-  long DEFAULT_MAX_BYTES_IN_MEMORY = JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6;
+  public AppendableIndexModule()
+  {
+    super("AppendableIndexFactories");
+
+    setMixInAnnotation(AppendableIndexSpec.class, AppendableIndexSpecMixin.class);
+  }
+
+  @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = OnheapIncrementalIndex.Spec.class)
+  @JsonSubTypes(value = {
+      @JsonSubTypes.Type(name = OnheapIncrementalIndex.Spec.TYPE, value = OnheapIncrementalIndex.Spec.class),
+  })
+  public interface AppendableIndexSpecMixin
+  {
+  }
 }
diff --git a/processing/src/main/java/org/apache/druid/jackson/DefaultObjectMapper.java b/processing/src/main/java/org/apache/druid/jackson/DefaultObjectMapper.java
index 235d467..71cf30f 100644
--- a/processing/src/main/java/org/apache/druid/jackson/DefaultObjectMapper.java
+++ b/processing/src/main/java/org/apache/druid/jackson/DefaultObjectMapper.java
@@ -49,6 +49,7 @@ public class DefaultObjectMapper extends ObjectMapper
     registerModule(new AggregatorsModule());
     registerModule(new StringComparatorModule());
     registerModule(new SegmentizerModule());
+    registerModule(new AppendableIndexModule());
 
     configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
     configure(MapperFeature.AUTO_DETECT_GETTERS, false);
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryHelper.java
index 877fd34..f0b2a9e 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryHelper.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryHelper.java
@@ -39,9 +39,12 @@ import org.apache.druid.query.ResourceLimitExceededException;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.PostAggregator;
 import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.segment.incremental.AppendableIndexBuilder;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.OffheapIncrementalIndex;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
@@ -117,24 +120,24 @@ public class GroupByQueryHelper
         .withMinTimestamp(granTimeStart.getMillis())
         .build();
 
+
+    final AppendableIndexBuilder indexBuilder;
+
     if (query.getContextValue("useOffheap", false)) {
-      index = new IncrementalIndex.Builder()
-          .setIndexSchema(indexSchema)
-          .setDeserializeComplexMetrics(false)
-          .setConcurrentEventAdd(true)
-          .setSortFacts(sortResults)
-          .setMaxRowCount(querySpecificConfig.getMaxResults())
-          .buildOffheap(bufferPool);
+      indexBuilder = new OffheapIncrementalIndex.Builder()
+          .setBufferPool(bufferPool);
     } else {
-      index = new IncrementalIndex.Builder()
-          .setIndexSchema(indexSchema)
-          .setDeserializeComplexMetrics(false)
-          .setConcurrentEventAdd(true)
-          .setSortFacts(sortResults)
-          .setMaxRowCount(querySpecificConfig.getMaxResults())
-          .buildOnheap();
+      indexBuilder = new OnheapIncrementalIndex.Builder();
     }
 
+    index = indexBuilder
+        .setIndexSchema(indexSchema)
+        .setDeserializeComplexMetrics(false)
+        .setConcurrentEventAdd(true)
+        .setSortFacts(sortResults)
+        .setMaxRowCount(querySpecificConfig.getMaxResults())
+        .build();
+
     Accumulator<IncrementalIndex, T> accumulator = new Accumulator<IncrementalIndex, T>()
     {
       @Override
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
new file mode 100644
index 0000000..220f0e3
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+
+import javax.annotation.Nullable;
+
+public abstract class AppendableIndexBuilder
+{
+  @Nullable
+  protected IncrementalIndexSchema incrementalIndexSchema = null;
+  protected boolean deserializeComplexMetrics = true;
+  protected boolean concurrentEventAdd = false;
+  protected boolean sortFacts = true;
+  protected int maxRowCount = 0;
+  protected long maxBytesInMemory = 0;
+
+  protected final Logger log = new Logger(this.getClass().getName());
+
+  public AppendableIndexBuilder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema)
+  {
+    this.incrementalIndexSchema = incrementalIndexSchema;
+    return this;
+  }
+
+  /**
+   * A helper method to set a simple index schema with only metrics and default values for the other parameters. Note
+   * that this method is normally used for testing and benchmarking; it is unlikely that you would use it in
+   * production settings.
+   *
+   * @param metrics variable array of {@link AggregatorFactory} metrics
+   *
+   * @return this
+   */
+  @VisibleForTesting
+  public AppendableIndexBuilder setSimpleTestingIndexSchema(final AggregatorFactory... metrics)
+  {
+    return setSimpleTestingIndexSchema(null, metrics);
+  }
+
+
+  /**
+   * A helper method to set a simple index schema with controllable metrics and rollup, and default values for the
+   * other parameters. Note that this method is normally used for testing and benchmarking; it is unlikely that you
+   * would use it in production settings.
+   *
+   * @param metrics variable array of {@link AggregatorFactory} metrics
+   *
+   * @return this
+   */
+  @VisibleForTesting
+  public AppendableIndexBuilder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics)
+  {
+    IncrementalIndexSchema.Builder builder = new IncrementalIndexSchema.Builder().withMetrics(metrics);
+    this.incrementalIndexSchema = rollup != null ? builder.withRollup(rollup).build() : builder.build();
+    return this;
+  }
+
+  public AppendableIndexBuilder setDeserializeComplexMetrics(final boolean deserializeComplexMetrics)
+  {
+    this.deserializeComplexMetrics = deserializeComplexMetrics;
+    return this;
+  }
+
+  public AppendableIndexBuilder setConcurrentEventAdd(final boolean concurrentEventAdd)
+  {
+    this.concurrentEventAdd = concurrentEventAdd;
+    return this;
+  }
+
+  public AppendableIndexBuilder setSortFacts(final boolean sortFacts)
+  {
+    this.sortFacts = sortFacts;
+    return this;
+  }
+
+  public AppendableIndexBuilder setMaxRowCount(final int maxRowCount)
+  {
+    this.maxRowCount = maxRowCount;
+    return this;
+  }
+
+  public AppendableIndexBuilder setMaxBytesInMemory(final long maxBytesInMemory)
+  {
+    this.maxBytesInMemory = maxBytesInMemory;
+    return this;
+  }
+
+  public void validate()
+  {
+    if (maxRowCount <= 0) {
+      throw new IllegalArgumentException("Invalid max row count: " + maxRowCount);
+    }
+
+    if (incrementalIndexSchema == null) {
+      throw new IllegalArgumentException("incrementIndexSchema cannot be null");
+    }
+  }
+
+  public final IncrementalIndex build()
+  {
+    log.debug("Building appendable index.");
+    validate();
+    return buildInner();
+  }
+
+  protected abstract IncrementalIndex buildInner();
+}
diff --git a/server/src/main/java/org/apache/druid/segment/indexing/TuningConfigs.java b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java
similarity index 53%
rename from server/src/main/java/org/apache/druid/segment/indexing/TuningConfigs.java
rename to processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java
index 9c470ab..67cdabd 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/TuningConfigs.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java
@@ -17,25 +17,19 @@
  * under the License.
  */
 
-package org.apache.druid.segment.indexing;
+package org.apache.druid.segment.incremental;
 
-public class TuningConfigs
+import org.apache.druid.guice.annotations.UnstableApi;
+
+/**
+ * AppendableIndexSpec describes the in-memory indexing method for data ingestion.
+ */
+@UnstableApi
+public interface AppendableIndexSpec
 {
-  private TuningConfigs()
-  {
-  }
+  // Returns a builder of the appendable index.
+  AppendableIndexBuilder builder();
 
-  public static long getMaxBytesInMemoryOrDefault(final long maxBytesInMemory)
-  {
-    // In the main tuningConfig class constructor, we set the maxBytes to 0 if null to avoid setting
-    // maxBytes to max jvm memory of the process that starts first. Instead we set the default based on
-    // the actual task node's jvm memory.
-    long newMaxBytesInMemory = maxBytesInMemory;
-    if (maxBytesInMemory == 0) {
-      newMaxBytesInMemory = TuningConfig.DEFAULT_MAX_BYTES_IN_MEMORY;
-    } else if (maxBytesInMemory < 0) {
-      newMaxBytesInMemory = Long.MAX_VALUE;
-    }
-    return newMaxBytesInMemory;
-  }
+  // Returns the default max bytes in memory for this index.
+  long getDefaultMaxBytesInMemory();
 }
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
index a718b53..cc338b9 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
@@ -31,7 +31,6 @@ import com.google.common.collect.Sets;
 import com.google.common.primitives.Ints;
 import com.google.common.primitives.Longs;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
-import org.apache.druid.collections.NonBlockingPool;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.MapBasedRow;
@@ -79,7 +78,6 @@ import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
 import java.io.Closeable;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
@@ -88,7 +86,6 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
@@ -319,126 +316,62 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
     }
   }
 
-  public static class Builder
+  /**
+   * This class exists only as backward competability to reduce the number of modified lines.
+   */
+  public static class Builder extends OnheapIncrementalIndex.Builder
   {
-    @Nullable
-    private IncrementalIndexSchema incrementalIndexSchema;
-    private boolean deserializeComplexMetrics;
-    private boolean concurrentEventAdd;
-    private boolean sortFacts;
-    private int maxRowCount;
-    private long maxBytesInMemory;
-
-    public Builder()
-    {
-      incrementalIndexSchema = null;
-      deserializeComplexMetrics = true;
-      concurrentEventAdd = false;
-      sortFacts = true;
-      maxRowCount = 0;
-      maxBytesInMemory = 0;
-    }
-
+    @Override
     public Builder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema)
     {
-      this.incrementalIndexSchema = incrementalIndexSchema;
-      return this;
+      return (Builder) super.setIndexSchema(incrementalIndexSchema);
     }
 
-    /**
-     * A helper method to set a simple index schema with only metrics and default values for the other parameters. Note
-     * that this method is normally used for testing and benchmarking; it is unlikely that you would use it in
-     * production settings.
-     *
-     * @param metrics variable array of {@link AggregatorFactory} metrics
-     *
-     * @return this
-     */
-    @VisibleForTesting
+    @Override
     public Builder setSimpleTestingIndexSchema(final AggregatorFactory... metrics)
     {
-      return setSimpleTestingIndexSchema(null, metrics);
+      return (Builder) super.setSimpleTestingIndexSchema(metrics);
     }
 
-
-    /**
-     * A helper method to set a simple index schema with controllable metrics and rollup, and default values for the
-     * other parameters. Note that this method is normally used for testing and benchmarking; it is unlikely that you
-     * would use it in production settings.
-     *
-     * @param metrics variable array of {@link AggregatorFactory} metrics
-     *
-     * @return this
-     */
-    @VisibleForTesting
+    @Override
     public Builder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics)
     {
-      IncrementalIndexSchema.Builder builder = new IncrementalIndexSchema.Builder().withMetrics(metrics);
-      this.incrementalIndexSchema = rollup != null ? builder.withRollup(rollup).build() : builder.build();
-      return this;
+      return (Builder) super.setSimpleTestingIndexSchema(rollup, metrics);
     }
 
+    @Override
     public Builder setDeserializeComplexMetrics(final boolean deserializeComplexMetrics)
     {
-      this.deserializeComplexMetrics = deserializeComplexMetrics;
-      return this;
+      return (Builder) super.setDeserializeComplexMetrics(deserializeComplexMetrics);
     }
 
+    @Override
     public Builder setConcurrentEventAdd(final boolean concurrentEventAdd)
     {
-      this.concurrentEventAdd = concurrentEventAdd;
-      return this;
+      return (Builder) super.setConcurrentEventAdd(concurrentEventAdd);
     }
 
+    @Override
     public Builder setSortFacts(final boolean sortFacts)
     {
-      this.sortFacts = sortFacts;
-      return this;
+      return (Builder) super.setSortFacts(sortFacts);
     }
 
+    @Override
     public Builder setMaxRowCount(final int maxRowCount)
     {
-      this.maxRowCount = maxRowCount;
-      return this;
+      return (Builder) super.setMaxRowCount(maxRowCount);
     }
 
-    //maxBytesInMemory only applies to OnHeapIncrementalIndex
+    @Override
     public Builder setMaxBytesInMemory(final long maxBytesInMemory)
     {
-      this.maxBytesInMemory = maxBytesInMemory;
-      return this;
+      return (Builder) super.setMaxBytesInMemory(maxBytesInMemory);
     }
 
     public OnheapIncrementalIndex buildOnheap()
     {
-      if (maxRowCount <= 0) {
-        throw new IllegalArgumentException("Invalid max row count: " + maxRowCount);
-      }
-
-      return new OnheapIncrementalIndex(
-          Objects.requireNonNull(incrementalIndexSchema, "incrementIndexSchema is null"),
-          deserializeComplexMetrics,
-          concurrentEventAdd,
-          sortFacts,
-          maxRowCount,
-          maxBytesInMemory
-      );
-    }
-
-    public IncrementalIndex buildOffheap(final NonBlockingPool<ByteBuffer> bufferPool)
-    {
-      if (maxRowCount <= 0) {
-        throw new IllegalArgumentException("Invalid max row count: " + maxRowCount);
-      }
-
-      return new OffheapIncrementalIndex(
-          Objects.requireNonNull(incrementalIndexSchema, "incrementalIndexSchema is null"),
-          deserializeComplexMetrics,
-          concurrentEventAdd,
-          sortFacts,
-          maxRowCount,
-          Objects.requireNonNull(bufferPool, "bufferPool is null")
-      );
+      return (OnheapIncrementalIndex) build();
     }
   }
 
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
index 97da994..b3cdabc 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
@@ -40,6 +40,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -115,6 +116,8 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
     selectors = new HashMap<>();
     aggOffsetInBuffer = new int[metrics.length];
 
+    int aggsCurOffsetInBuffer = 0;
+
     for (int i = 0; i < metrics.length; i++) {
       AggregatorFactory agg = metrics[i];
 
@@ -129,15 +132,11 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
           new OnheapIncrementalIndex.CachingColumnSelectorFactory(columnSelectorFactory, concurrentEventAdd)
       );
 
-      if (i == 0) {
-        aggOffsetInBuffer[i] = 0;
-      } else {
-        aggOffsetInBuffer[i] = aggOffsetInBuffer[i - 1] + metrics[i - 1].getMaxIntermediateSizeWithNulls();
-      }
+      aggOffsetInBuffer[i] = aggsCurOffsetInBuffer;
+      aggsCurOffsetInBuffer += agg.getMaxIntermediateSizeWithNulls();
     }
 
-    aggsTotalSize = aggOffsetInBuffer[metrics.length - 1] + metrics[metrics.length
-                                                                    - 1].getMaxIntermediateSizeWithNulls();
+    aggsTotalSize = aggsCurOffsetInBuffer;
 
     return new BufferAggregator[metrics.length];
   }
@@ -346,4 +345,38 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
     }
     aggBuffers.clear();
   }
+
+  public static class Builder extends AppendableIndexBuilder
+  {
+    @Nullable
+    NonBlockingPool<ByteBuffer> bufferPool = null;
+
+    public Builder setBufferPool(final NonBlockingPool<ByteBuffer> bufferPool)
+    {
+      this.bufferPool = bufferPool;
+      return this;
+    }
+
+    @Override
+    public void validate()
+    {
+      super.validate();
+      if (bufferPool == null) {
+        throw new IllegalArgumentException("bufferPool cannot be null");
+      }
+    }
+
+    @Override
+    protected OffheapIncrementalIndex buildInner()
+    {
+      return new OffheapIncrementalIndex(
+          Objects.requireNonNull(incrementalIndexSchema, "incrementalIndexSchema is null"),
+          deserializeComplexMetrics,
+          concurrentEventAdd,
+          sortFacts,
+          maxRowCount,
+          Objects.requireNonNull(bufferPool, "bufferPool is null")
+      );
+    }
+  }
 }
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
index 9bbdf60..a72124a 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -32,6 +32,7 @@ import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.DimensionSelector;
 import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.utils.JvmUtils;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
@@ -40,6 +41,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -434,4 +436,51 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
     }
   }
 
+  public static class Builder extends AppendableIndexBuilder
+  {
+    @Override
+    protected OnheapIncrementalIndex buildInner()
+    {
+      return new OnheapIncrementalIndex(
+          Objects.requireNonNull(incrementalIndexSchema, "incrementIndexSchema is null"),
+          deserializeComplexMetrics,
+          concurrentEventAdd,
+          sortFacts,
+          maxRowCount,
+          maxBytesInMemory
+      );
+    }
+  }
+
+  public static class Spec implements AppendableIndexSpec
+  {
+    public static final String TYPE = "onheap";
+
+    @Override
+    public AppendableIndexBuilder builder()
+    {
+      return new Builder();
+    }
+
+    @Override
+    public long getDefaultMaxBytesInMemory()
+    {
+      // We initially estimated this to be 1/3(max jvm memory), but bytesCurrentlyInMemory only
+      // tracks active index and not the index being flushed to disk, to account for that
+      // we halved default to 1/6(max jvm memory)
+      return JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6;
+    }
+
+    @Override
+    public boolean equals(Object that)
+    {
+      return that.getClass().equals(this.getClass());
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(this.getClass());
+    }
+  }
 }
diff --git a/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
index dfc3e53..534034b 100644
--- a/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
@@ -63,9 +63,10 @@ import org.apache.druid.segment.CloserRule;
 import org.apache.druid.segment.IncrementalIndexSegment;
 import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.incremental.IncrementalIndex;
-import org.apache.druid.segment.incremental.IncrementalIndex.Builder;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.OffheapIncrementalIndex;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.joda.time.Interval;
 import org.junit.AfterClass;
@@ -130,10 +131,11 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
     RESOURCE_CLOSER.register(pool1);
     params.add(
         new Object[] {
-            (IndexCreator) factories -> new Builder()
+            (IndexCreator) factories -> new OffheapIncrementalIndex.Builder()
+                .setBufferPool(pool1)
                 .setSimpleTestingIndexSchema(factories)
                 .setMaxRowCount(1000000)
-                .buildOffheap(pool1)
+                .build()
         }
     );
     params.add(new Object[] {(IndexCreator) IncrementalIndexTest::createNoRollupIndex});
@@ -144,7 +146,8 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
     RESOURCE_CLOSER.register(pool2);
     params.add(
         new Object[] {
-            (IndexCreator) factories -> new Builder()
+            (IndexCreator) factories -> new OffheapIncrementalIndex.Builder()
+                .setBufferPool(pool2)
                 .setIndexSchema(
                     new IncrementalIndexSchema.Builder()
                         .withMetrics(factories)
@@ -152,7 +155,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
                         .build()
                 )
                 .setMaxRowCount(1000000)
-                .buildOffheap(pool2)
+                .build()
         }
     );
 
@@ -173,7 +176,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
       aggregatorFactories = DEFAULT_AGGREGATOR_FACTORIES;
     }
 
-    return new IncrementalIndex.Builder()
+    return new OnheapIncrementalIndex.Builder()
         .setIndexSchema(
             new IncrementalIndexSchema.Builder()
                 .withDimensionsSpec(dimensionsSpec)
@@ -181,7 +184,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
                 .build()
         )
         .setMaxRowCount(1000000)
-        .buildOnheap();
+        .build();
   }
 
   public static IncrementalIndex createIndex(AggregatorFactory[] aggregatorFactories)
@@ -190,10 +193,10 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
       aggregatorFactories = DEFAULT_AGGREGATOR_FACTORIES;
     }
 
-    return new IncrementalIndex.Builder()
+    return new OnheapIncrementalIndex.Builder()
         .setSimpleTestingIndexSchema(aggregatorFactories)
         .setMaxRowCount(1000000)
-        .buildOnheap();
+        .build();
   }
 
   public static IncrementalIndex createNoRollupIndex(AggregatorFactory[] aggregatorFactories)
@@ -202,10 +205,10 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
       aggregatorFactories = DEFAULT_AGGREGATOR_FACTORIES;
     }
 
-    return new IncrementalIndex.Builder()
+    return new OnheapIncrementalIndex.Builder()
         .setSimpleTestingIndexSchema(false, aggregatorFactories)
         .setMaxRowCount(1000000)
-        .buildOnheap();
+        .build();
   }
 
   public static void populateIndex(long timestamp, IncrementalIndex index) throws IndexSizeExceededException
@@ -722,7 +725,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
   @Test
   public void testgetDimensions()
   {
-    final IncrementalIndex<Aggregator> incrementalIndex = new IncrementalIndex.Builder()
+    final IncrementalIndex<Aggregator> incrementalIndex = (OnheapIncrementalIndex) new OnheapIncrementalIndex.Builder()
         .setIndexSchema(
             new IncrementalIndexSchema.Builder()
                 .withMetrics(new CountAggregatorFactory("count"))
@@ -736,7 +739,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
                 .build()
         )
         .setMaxRowCount(1000000)
-        .buildOnheap();
+        .build();
     closerRule.closeLater(incrementalIndex);
 
     Assert.assertEquals(Arrays.asList("dim0", "dim1"), incrementalIndex.getDimensionNames());
@@ -745,10 +748,10 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
   @Test
   public void testDynamicSchemaRollup() throws IndexSizeExceededException
   {
-    IncrementalIndex<Aggregator> index = new IncrementalIndex.Builder()
+    IncrementalIndex<Aggregator> index = (OnheapIncrementalIndex) new OnheapIncrementalIndex.Builder()
         .setSimpleTestingIndexSchema(/* empty */)
         .setMaxRowCount(10)
-        .buildOnheap();
+        .build();
     closerRule.closeLater(index);
     index.add(
         new MapBasedInputRow(
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java
index 91863ff..c7b885a 100644
--- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java
@@ -117,12 +117,12 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
                 @Override
                 public IncrementalIndex createIndex()
                 {
-                  return new IncrementalIndex.Builder()
+                  return new OnheapIncrementalIndex.Builder()
                       .setIndexSchema(schema)
                       .setDeserializeComplexMetrics(false)
                       .setSortFacts(sortFacts)
                       .setMaxRowCount(1000)
-                      .buildOnheap();
+                      .build();
                 }
               },
               Closer.create()
@@ -141,11 +141,12 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
                 @Override
                 public IncrementalIndex createIndex()
                 {
-                  return new IncrementalIndex.Builder()
+                  return new OffheapIncrementalIndex.Builder()
+                      .setBufferPool(stupidPool)
                       .setIndexSchema(schema)
                       .setSortFacts(sortFacts)
                       .setMaxRowCount(1000000)
-                      .buildOffheap(stupidPool);
+                      .build();
                 }
               },
               poolCloser
diff --git a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
index 728e2ff..48fb592 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
@@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
 import org.apache.druid.segment.realtime.plumber.IntervalStartVersioningPolicy;
 import org.apache.druid.segment.realtime.plumber.RejectionPolicyFactory;
@@ -41,9 +42,8 @@ import java.io.File;
 /**
  *
  */
-public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
+public class RealtimeTuningConfig implements AppenderatorConfig
 {
-  private static final int DEFAULT_MAX_ROWS_IN_MEMORY = TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY;
   private static final Period DEFAULT_INTERMEDIATE_PERSIST_PERIOD = new Period("PT10M");
   private static final Period DEFAULT_WINDOW_PERIOD = new Period("PT10M");
   private static final VersioningPolicy DEFAULT_VERSIONING_POLICY = new IntervalStartVersioningPolicy();
@@ -65,6 +65,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
   public static RealtimeTuningConfig makeDefaultTuningConfig(final @Nullable File basePersistDirectory)
   {
     return new RealtimeTuningConfig(
+        DEFAULT_APPENDABLE_INDEX,
         DEFAULT_MAX_ROWS_IN_MEMORY,
         0L,
         DEFAULT_INTERMEDIATE_PERSIST_PERIOD,
@@ -87,6 +88,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
     );
   }
 
+  private final AppendableIndexSpec appendableIndexSpec;
   private final int maxRowsInMemory;
   private final long maxBytesInMemory;
   private final Period intermediatePersistPeriod;
@@ -110,6 +112,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
 
   @JsonCreator
   public RealtimeTuningConfig(
+      @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
       @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
       @JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
       @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
@@ -132,9 +135,10 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
       @JsonProperty("dedupColumn") @Nullable String dedupColumn
   )
   {
+    this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
     this.maxRowsInMemory = maxRowsInMemory == null ? DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
     // initializing this to 0, it will be lazily initialized to a value
-    // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
+    // @see #getMaxBytesInMemoryOrDefault()
     this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
     this.intermediatePersistPeriod = intermediatePersistPeriod == null
                                      ? DEFAULT_INTERMEDIATE_PERSIST_PERIOD
@@ -168,6 +172,13 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
 
   @Override
   @JsonProperty
+  public AppendableIndexSpec getAppendableIndexSpec()
+  {
+    return appendableIndexSpec;
+  }
+
+  @Override
+  @JsonProperty
   public int getMaxRowsInMemory()
   {
     return maxRowsInMemory;
@@ -304,6 +315,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
   public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy)
   {
     return new RealtimeTuningConfig(
+        appendableIndexSpec,
         maxRowsInMemory,
         maxBytesInMemory,
         intermediatePersistPeriod,
@@ -330,6 +342,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
   public RealtimeTuningConfig withBasePersistDirectory(File dir)
   {
     return new RealtimeTuningConfig(
+        appendableIndexSpec,
         maxRowsInMemory,
         maxBytesInMemory,
         intermediatePersistPeriod,
diff --git a/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java b/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
index 6d02ca4..e3a4e1f 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
@@ -21,7 +21,8 @@ package org.apache.druid.segment.indexing;
 
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import org.apache.druid.utils.JvmUtils;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 
 /**
  */
@@ -32,11 +33,37 @@ import org.apache.druid.utils.JvmUtils;
 public interface TuningConfig
 {
   boolean DEFAULT_LOG_PARSE_EXCEPTIONS = false;
+  AppendableIndexSpec DEFAULT_APPENDABLE_INDEX = new OnheapIncrementalIndex.Spec();
   int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE;
   int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0;
   int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000;
-  // We initially estimated this to be 1/3(max jvm memory), but bytesCurrentlyInMemory only
-  // tracks active index and not the index being flushed to disk, to account for that
-  // we halved default to 1/6(max jvm memory)
-  long DEFAULT_MAX_BYTES_IN_MEMORY = JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6;
+
+  /**
+   * The incremental index implementation to use
+   */
+  AppendableIndexSpec getAppendableIndexSpec();
+
+  /**
+   * Maximum number of bytes (estimated) to store in memory before persisting to local storage
+   */
+  long getMaxBytesInMemory();
+
+  /**
+   * Maximum number of bytes (estimated) to store in memory before persisting to local storage.
+   * If getMaxBytesInMemory() returns 0, the appendable index default will be used.
+   */
+  default long getMaxBytesInMemoryOrDefault()
+  {
+    // In the main tuningConfig class constructor, we set the maxBytes to 0 if null to avoid setting
+    // maxBytes to max jvm memory of the process that starts first. Instead we set the default based on
+    // the actual task node's jvm memory.
+    final long maxBytesInMemory = getMaxBytesInMemory();
+    if (maxBytesInMemory > 0) {
+      return maxBytesInMemory;
+    } else if (maxBytesInMemory == 0) {
+      return getAppendableIndexSpec().getDefaultMaxBytesInMemory();
+    } else {
+      return Long.MAX_VALUE;
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
index f8d09e0..259e58c 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
@@ -21,13 +21,14 @@ package org.apache.druid.segment.realtime.appenderator;
 
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.indexing.TuningConfig;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.joda.time.Period;
 
 import javax.annotation.Nullable;
 import java.io.File;
 
-public interface AppenderatorConfig
+public interface AppenderatorConfig extends TuningConfig
 {
   boolean isReportParseExceptions();
 
@@ -36,11 +37,6 @@ public interface AppenderatorConfig
    */
   int getMaxRowsInMemory();
 
-  /**
-   * Maximum number of bytes (estimated) to store in memory before persisting to local storage
-   */
-  long getMaxBytesInMemory();
-
   int getMaxPendingPersists();
 
   /**
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
index c65c2ad..c288837 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
@@ -64,7 +64,6 @@ import org.apache.druid.segment.incremental.IndexSizeExceededException;
 import org.apache.druid.segment.incremental.ParseExceptionHandler;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
 import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.segment.indexing.TuningConfigs;
 import org.apache.druid.segment.loading.DataSegmentPusher;
 import org.apache.druid.segment.realtime.FireDepartmentMetrics;
 import org.apache.druid.segment.realtime.FireHydrant;
@@ -199,7 +198,7 @@ public class AppenderatorImpl implements Appenderator
       this.sinkTimeline = sinkQuerySegmentWalker.getSinkTimeline();
     }
 
-    maxBytesTuningConfig = TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory());
+    maxBytesTuningConfig = tuningConfig.getMaxBytesInMemoryOrDefault();
   }
 
   @Override
@@ -404,6 +403,7 @@ public class AppenderatorImpl implements Appenderator
           schema,
           identifier.getShardSpec(),
           identifier.getVersion(),
+          tuningConfig.getAppendableIndexSpec(),
           tuningConfig.getMaxRowsInMemory(),
           maxBytesTuningConfig,
           null
@@ -1121,6 +1121,7 @@ public class AppenderatorImpl implements Appenderator
             schema,
             identifier.getShardSpec(),
             identifier.getVersion(),
+            tuningConfig.getAppendableIndexSpec(),
             tuningConfig.getMaxRowsInMemory(),
             maxBytesTuningConfig,
             null,
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
index 7fae74c..02f75c7 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
@@ -49,6 +49,7 @@ import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.IndexableAdapter;
 import org.apache.druid.segment.ProgressIndicator;
 import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.ParseExceptionHandler;
 import org.apache.druid.segment.incremental.RowIngestionMeters;
@@ -383,6 +384,12 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
     }
 
     @Override
+    public AppendableIndexSpec getAppendableIndexSpec()
+    {
+      return baseConfig.getAppendableIndexSpec();
+    }
+
+    @Override
     public int getMaxRowsInMemory()
     {
       return Integer.MAX_VALUE; // unlimited, rely on maxBytesInMemory instead
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java
index bae600c..2aec758 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java
@@ -61,7 +61,6 @@ import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
 import org.apache.druid.segment.incremental.IndexSizeExceededException;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.RealtimeTuningConfig;
-import org.apache.druid.segment.indexing.TuningConfigs;
 import org.apache.druid.segment.join.JoinableFactory;
 import org.apache.druid.segment.loading.DataSegmentPusher;
 import org.apache.druid.segment.realtime.FireDepartmentMetrics;
@@ -260,8 +259,9 @@ public class RealtimePlumber implements Plumber
           schema,
           config.getShardSpec(),
           versioningPolicy.getVersion(sinkInterval),
+          config.getAppendableIndexSpec(),
           config.getMaxRowsInMemory(),
-          TuningConfigs.getMaxBytesInMemoryOrDefault(config.getMaxBytesInMemory()),
+          config.getMaxBytesInMemoryOrDefault(),
           config.getDedupColumn()
       );
       addSink(retVal);
@@ -723,8 +723,9 @@ public class RealtimePlumber implements Plumber
           schema,
           config.getShardSpec(),
           versioningPolicy.getVersion(sinkInterval),
+          config.getAppendableIndexSpec(),
           config.getMaxRowsInMemory(),
-          TuningConfigs.getMaxBytesInMemoryOrDefault(config.getMaxBytesInMemory()),
+          config.getMaxBytesInMemoryOrDefault(),
           config.getDedupColumn(),
           hydrants
       );
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java
index d6cb223..ccf5e4c 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java
@@ -30,6 +30,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.ReferenceCountingSegment;
 import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
@@ -63,6 +64,7 @@ public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
   private final DataSchema schema;
   private final ShardSpec shardSpec;
   private final String version;
+  private final AppendableIndexSpec appendableIndexSpec;
   private final int maxRowsInMemory;
   private final long maxBytesInMemory;
   private final CopyOnWriteArrayList<FireHydrant> hydrants = new CopyOnWriteArrayList<>();
@@ -79,6 +81,7 @@ public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
       DataSchema schema,
       ShardSpec shardSpec,
       String version,
+      AppendableIndexSpec appendableIndexSpec,
       int maxRowsInMemory,
       long maxBytesInMemory,
       String dedupColumn
@@ -89,6 +92,7 @@ public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
         schema,
         shardSpec,
         version,
+        appendableIndexSpec,
         maxRowsInMemory,
         maxBytesInMemory,
         dedupColumn,
@@ -101,6 +105,7 @@ public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
       DataSchema schema,
       ShardSpec shardSpec,
       String version,
+      AppendableIndexSpec appendableIndexSpec,
       int maxRowsInMemory,
       long maxBytesInMemory,
       String dedupColumn,
@@ -111,6 +116,7 @@ public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
     this.shardSpec = shardSpec;
     this.interval = interval;
     this.version = version;
+    this.appendableIndexSpec = appendableIndexSpec;
     this.maxRowsInMemory = maxRowsInMemory;
     this.maxBytesInMemory = maxBytesInMemory;
     this.dedupColumn = dedupColumn;
@@ -322,11 +328,13 @@ public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
         .withMetrics(schema.getAggregators())
         .withRollup(schema.getGranularitySpec().isRollup())
         .build();
-    final IncrementalIndex newIndex = new IncrementalIndex.Builder()
+
+    // Build the incremental-index according to the spec that was chosen by the user
+    final IncrementalIndex newIndex = appendableIndexSpec.builder()
         .setIndexSchema(indexSchema)
         .setMaxRowCount(maxRowsInMemory)
         .setMaxBytesInMemory(maxBytesInMemory)
-        .buildOnheap();
+        .build();
 
     final FireHydrant old;
     synchronized (hydrantLock) {
diff --git a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
index ea3f7cf..72f0b8b 100644
--- a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
+++ b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.hamcrest.CoreMatchers;
 import org.joda.time.Period;
@@ -89,6 +90,7 @@ public class RealtimeTuningConfigTest
     );
 
     Assert.assertNotNull(config.getBasePersistDirectory());
+    Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
     Assert.assertEquals(0, config.getHandoffConditionTimeout());
     Assert.assertEquals(0, config.getAlertTimeout());
     Assert.assertEquals(new IndexSpec(), config.getIndexSpec());
@@ -119,7 +121,8 @@ public class RealtimeTuningConfigTest
                      + "  \"handoffConditionTimeout\": 100,\n"
                      + "  \"alertTimeout\": 70,\n"
                      + "  \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n"
-                     + "  \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" }\n"
+                     + "  \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" },\n"
+                     + "  \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n"
                      + "}";
 
     ObjectMapper mapper = TestHelper.makeJsonMapper();
@@ -134,6 +137,7 @@ public class RealtimeTuningConfigTest
     );
 
     Assert.assertEquals("/tmp/xxx", config.getBasePersistDirectory().toString());
+    Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
     Assert.assertEquals(100, config.getHandoffConditionTimeout());
     Assert.assertEquals(70, config.getAlertTimeout());
     Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod());
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java
index ee79dc1..87df0e8 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java
@@ -62,6 +62,7 @@ public class AppenderatorPlumberTest
                 EasyMock.anyObject())).andReturn(true).anyTimes();
     
     RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(
+        null,
         1,
         null,
         null,
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java
index 9e57d28..c77d429 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java
@@ -162,6 +162,7 @@ public class AppenderatorTester implements AutoCloseable
         objectMapper
     );
     tuningConfig = new RealtimeTuningConfig(
+        null,
         maxRowsInMemory,
         maxSizeInBytes == 0L ? getDefaultMaxBytesInMemory() : maxSizeInBytes,
         null,
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java
index 9e9d350..6e38c1c 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java
@@ -134,6 +134,7 @@ public class DefaultOfflineAppenderatorFactoryTest
     );
 
     RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(
+        null,
         75000,
         null,
         null,
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
index 0eb62e8..9b73cbe 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
@@ -48,7 +48,6 @@ import org.apache.druid.segment.ReferenceCountingSegment;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.RealtimeTuningConfig;
-import org.apache.druid.segment.indexing.TuningConfigs;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.join.NoopJoinableFactory;
 import org.apache.druid.segment.loading.DataSegmentPusher;
@@ -200,6 +199,7 @@ public class RealtimePlumberSchoolTest extends InitializedNullHandlingTest
     EasyMock.replay(announcer, segmentPublisher, dataSegmentPusher, handoffNotifierFactory, handoffNotifier, emitter);
 
     tuningConfig = new RealtimeTuningConfig(
+        null,
         1,
         null,
         null,
@@ -278,8 +278,9 @@ public class RealtimePlumberSchoolTest extends InitializedNullHandlingTest
         schema,
         tuningConfig.getShardSpec(),
         DateTimes.of("2014-12-01T12:34:56.789").toString(),
+        tuningConfig.getAppendableIndexSpec(),
         tuningConfig.getMaxRowsInMemory(),
-        TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
+        tuningConfig.getMaxBytesInMemoryOrDefault(),
         tuningConfig.getDedupColumn()
     );
     plumber.getSinks().put(0L, sink);
@@ -323,8 +324,9 @@ public class RealtimePlumberSchoolTest extends InitializedNullHandlingTest
         schema,
         tuningConfig.getShardSpec(),
         DateTimes.of("2014-12-01T12:34:56.789").toString(),
+        tuningConfig.getAppendableIndexSpec(),
         tuningConfig.getMaxRowsInMemory(),
-        TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
+        tuningConfig.getMaxBytesInMemoryOrDefault(),
         tuningConfig.getDedupColumn()
     );
     plumber.getSinks().put(0L, sink);
@@ -373,8 +375,9 @@ public class RealtimePlumberSchoolTest extends InitializedNullHandlingTest
         schema2,
         tuningConfig.getShardSpec(),
         DateTimes.of("2014-12-01T12:34:56.789").toString(),
+        tuningConfig.getAppendableIndexSpec(),
         tuningConfig.getMaxRowsInMemory(),
-        TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
+        tuningConfig.getMaxBytesInMemoryOrDefault(),
         tuningConfig.getDedupColumn()
     );
     plumber2.getSinks().put(0L, sink);
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java
index bf94b2f..477c3bd 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java
@@ -34,7 +34,6 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.RealtimeTuningConfig;
-import org.apache.druid.segment.indexing.TuningConfigs;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.realtime.FireHydrant;
 import org.apache.druid.testing.InitializedNullHandlingTest;
@@ -66,6 +65,7 @@ public class SinkTest extends InitializedNullHandlingTest
     final Interval interval = Intervals.of("2013-01-01/2013-01-02");
     final String version = DateTimes.nowUtc().toString();
     RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(
+        null,
         100,
         null,
         new Period("P1Y"),
@@ -91,8 +91,9 @@ public class SinkTest extends InitializedNullHandlingTest
         schema,
         tuningConfig.getShardSpec(),
         version,
+        tuningConfig.getAppendableIndexSpec(),
         tuningConfig.getMaxRowsInMemory(),
-        TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
+        tuningConfig.getMaxBytesInMemoryOrDefault(),
         tuningConfig.getDedupColumn()
     );
 
@@ -220,6 +221,7 @@ public class SinkTest extends InitializedNullHandlingTest
     final Interval interval = Intervals.of("2013-01-01/2013-01-02");
     final String version = DateTimes.nowUtc().toString();
     RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(
+        null,
         100,
         null,
         new Period("P1Y"),
@@ -245,8 +247,9 @@ public class SinkTest extends InitializedNullHandlingTest
         schema,
         tuningConfig.getShardSpec(),
         version,
+        tuningConfig.getAppendableIndexSpec(),
         tuningConfig.getMaxRowsInMemory(),
-        TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
+        tuningConfig.getMaxBytesInMemoryOrDefault(),
         tuningConfig.getDedupColumn()
     );
 
diff --git a/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java b/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java
index bb759b6..8dec453 100644
--- a/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java
+++ b/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java
@@ -156,6 +156,7 @@ public class DruidJsonValidatorTest
             ),
 
             new RealtimeTuningConfig(
+                null,
                 1,
                 null,
                 new Period("PT10M"),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org