You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2021/01/08 06:19:04 UTC

[druid] branch master updated: IncrementalIndex Tests and Benchmarks Parametrization (#10593)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 08ab82f  IncrementalIndex Tests and Benchmarks Parametrization (#10593)
08ab82f is described below

commit 08ab82f55ca856d60dfe1088c1c0393428b0bb6d
Author: Liran Funaro <li...@verizonmedia.com>
AuthorDate: Fri Jan 8 08:18:47 2021 +0200

    IncrementalIndex Tests and Benchmarks Parametrization (#10593)
    
    * Remove redundant IncrementalIndex.Builder
    
    * Parametrize incremental index tests and benchmarks
    
    - Reveal and fix a bug in OffheapIncrementalIndex
    
    * Fix forbiddenapis error: Forbidden method invocation: java.lang.String#format(java.lang.String,java.lang.Object[]) [Uses default locale]
    
    * Fix Intellij errors: declared exception is never thrown
    
    * Add documentation and validate before closing objects on tearDown.
    
    * Add documentation to OffheapIncrementalIndexTestSpec
    
    * Doc corrections and minor changes.
    
    * Add logging for generated rows.
    
    * Refactor new tests/benchmarks.
    
    * Improve IncrementalIndexCreator documentation
    
    * Add required tests for DataGenerator
    
    * Revert "rollupOpportunity" to be a string
---
 .../druid/benchmark/FilterPartitionBenchmark.java  |   5 +-
 .../benchmark/FilteredAggregatorBenchmark.java     | 184 +++++++++++-----
 .../benchmark/GroupByTypeInterfaceBenchmark.java   |   5 +-
 .../IncrementalIndexRowTypeBenchmark.java          |  58 +++--
 .../benchmark/TopNTypeInterfaceBenchmark.java      |   5 +-
 .../indexing/IncrementalIndexReadBenchmark.java    |  35 +--
 .../indexing/IndexIngestionBenchmark.java          |  51 +++--
 .../benchmark/indexing/IndexMergeBenchmark.java    |  16 +-
 .../benchmark/indexing/IndexPersistBenchmark.java  | 105 +++++----
 .../druid/benchmark/query/GroupByBenchmark.java    | 230 ++++++++++---------
 .../druid/benchmark/query/ScanBenchmark.java       | 180 +++++++++------
 .../druid/benchmark/query/SearchBenchmark.java     | 174 +++++++++------
 .../druid/benchmark/query/TimeseriesBenchmark.java | 189 ++++++++++------
 .../druid/benchmark/query/TopNBenchmark.java       | 173 +++++++++------
 .../query/timecompare/TimeCompareBenchmark.java    |   5 +-
 .../DistinctCountGroupByQueryTest.java             |   5 +-
 .../DistinctCountTimeseriesQueryTest.java          |   5 +-
 .../distinctcount/DistinctCountTopNQueryTest.java  |   5 +-
 .../druid/segment/MapVirtualColumnTestBase.java    |   5 +-
 .../overlord/sampler/InputSourceSampler.java       |   5 +-
 .../firehose/IngestSegmentFirehoseFactoryTest.java |   5 +-
 .../IngestSegmentFirehoseFactoryTimelineTest.java  |   5 +-
 .../druid/segment/generator/DataGenerator.java     |  91 +++++++-
 .../incremental/AppendableIndexBuilder.java        |   2 +-
 .../segment/incremental/IncrementalIndex.java      |  60 -----
 .../incremental/OffheapIncrementalIndex.java       |  35 +--
 .../org/apache/druid/query/DoubleStorageTest.java  |   5 +-
 .../druid/query/MultiValuedDimensionTest.java      |   9 +-
 .../query/aggregation/AggregationTestHelper.java   |  13 +-
 .../first/StringFirstTimeseriesQueryTest.java      |   5 +-
 .../last/StringLastTimeseriesQueryTest.java        |   5 +-
 .../DataSourceMetadataQueryTest.java               |   5 +-
 ...GroupByLimitPushDownInsufficientBufferTest.java |   5 +-
 .../GroupByLimitPushDownMultiNodeMergeTest.java    |   5 +-
 .../query/groupby/GroupByMultiSegmentTest.java     |   5 +-
 .../groupby/GroupByQueryRunnerFactoryTest.java     |   5 +-
 .../query/groupby/NestedQueryPushDownTest.java     |   5 +-
 .../druid/query/metadata/SegmentAnalyzerTest.java  |   5 +-
 .../query/scan/MultiSegmentScanQueryTest.java      |   5 +-
 .../druid/query/search/SearchQueryRunnerTest.java  |   5 +-
 .../timeboundary/TimeBoundaryQueryRunnerTest.java  |   5 +-
 .../timeseries/TimeseriesQueryRunnerBonusTest.java |   5 +-
 .../org/apache/druid/segment/EmptyIndexTest.java   |   5 +-
 .../org/apache/druid/segment/IndexBuilder.java     |   5 +-
 .../java/org/apache/druid/segment/IndexIOTest.java |   9 +-
 .../apache/druid/segment/IndexMergerTestBase.java  |  77 +++----
 .../segment/IndexMergerV9CompatibilityTest.java    |   5 +-
 .../segment/IndexMergerV9WithSpatialIndexTest.java |  17 +-
 .../apache/druid/segment/SchemalessIndexTest.java  |  13 +-
 .../java/org/apache/druid/segment/TestIndex.java   |   5 +-
 .../druid/segment/data/IncrementalIndexTest.java   | 174 ++++++---------
 .../segment/filter/SpatialFilterBonusTest.java     |  17 +-
 .../druid/segment/filter/SpatialFilterTest.java    |  17 +-
 .../druid/segment/generator/DataGeneratorTest.java | 131 ++++++++++-
 .../incremental/IncrementalIndexAdapterTest.java   |  34 ++-
 .../incremental/IncrementalIndexCreator.java       | 244 +++++++++++++++++++++
 ...est.java => IncrementalIndexIngestionTest.java} | 127 ++++++-----
 .../IncrementalIndexMultiValueSpecTest.java        |  32 ++-
 .../incremental/IncrementalIndexRowCompTest.java   |  33 ++-
 .../incremental/IncrementalIndexRowSizeTest.java   |  53 +++--
 .../IncrementalIndexStorageAdapterTest.java        |  57 ++---
 .../segment/incremental/IncrementalIndexTest.java  | 104 ++-------
 .../OffheapIncrementalIndexTestSpec.java           | 107 +++++++++
 .../segment/virtual/ExpressionSelectorsTest.java   |   3 +-
 .../firehose/IngestSegmentFirehoseTest.java        |   9 +-
 65 files changed, 1932 insertions(+), 1076 deletions(-)

diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/FilterPartitionBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/FilterPartitionBenchmark.java
index f7a690f..6908b72 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/FilterPartitionBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/FilterPartitionBenchmark.java
@@ -70,6 +70,7 @@ import org.apache.druid.segment.generator.DataGenerator;
 import org.apache.druid.segment.generator.GeneratorBasicSchemas;
 import org.apache.druid.segment.generator.GeneratorSchemaInfo;
 import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.serde.ComplexMetrics;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.joda.time.Interval;
@@ -227,10 +228,10 @@ public class FilterPartitionBenchmark
 
   private IncrementalIndex makeIncIndex()
   {
-    return new IncrementalIndex.Builder()
+    return new OnheapIncrementalIndex.Builder()
         .setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
         .setMaxRowCount(rowsPerSegment)
-        .buildOnheap();
+        .build();
   }
 
   @Benchmark
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java
index 560148b..fab0831 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.benchmark;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.benchmark.query.QueryBenchmarkUtil;
@@ -68,13 +69,17 @@ import org.apache.druid.segment.column.ColumnConfig;
 import org.apache.druid.segment.generator.DataGenerator;
 import org.apache.druid.segment.generator.GeneratorBasicSchemas;
 import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexCreator;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.serde.ComplexMetrics;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.timeline.SegmentId;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
 import org.openjdk.jmh.annotations.Measurement;
 import org.openjdk.jmh.annotations.Mode;
 import org.openjdk.jmh.annotations.OutputTimeUnit;
@@ -88,7 +93,6 @@ import org.openjdk.jmh.infra.Blackhole;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -113,22 +117,22 @@ public class FilteredAggregatorBenchmark
   @Param({"false", "true"})
   private String vectorize;
 
+  @Param({"true", "false"})
+  private boolean descending;
+
   private static final Logger log = new Logger(FilteredAggregatorBenchmark.class);
   private static final int RNG_SEED = 9999;
   private static final IndexMergerV9 INDEX_MERGER_V9;
   private static final IndexIO INDEX_IO;
   public static final ObjectMapper JSON_MAPPER;
-  private IncrementalIndex incIndex;
-  private IncrementalIndex incIndexFilteredAgg;
-  private AggregatorFactory[] filteredMetrics;
-  private QueryableIndex qIndex;
-  private File indexFile;
+
+  private AppendableIndexSpec appendableIndexSpec;
+  private AggregatorFactory filteredMetric;
   private DimFilter filter;
-  private List<InputRow> inputRows;
+  private DataGenerator generator;
   private QueryRunnerFactory factory;
   private GeneratorSchemaInfo schemaInfo;
   private TimeseriesQuery query;
-  private File tmpDir;
 
   static {
     JSON_MAPPER = new DefaultObjectMapper();
@@ -146,8 +150,11 @@ public class FilteredAggregatorBenchmark
     INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, OffHeapMemorySegmentWriteOutMediumFactory.instance());
   }
 
+  /**
+   * Setup everything common for benchmarking both the incremental-index and the queriable-index.
+   */
   @Setup
-  public void setup() throws IOException
+  public void setup()
   {
     log.info("SETUP CALLED AT " + System.currentTimeMillis());
 
@@ -155,15 +162,13 @@ public class FilteredAggregatorBenchmark
 
     schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schema);
 
-    DataGenerator gen = new DataGenerator(
+    generator = new DataGenerator(
         schemaInfo.getColumnSchemas(),
         RNG_SEED,
         schemaInfo.getDataInterval(),
         rowsPerSegment
     );
 
-    incIndex = makeIncIndex(schemaInfo.getAggsArray());
-
     filter = new OrDimFilter(
         Arrays.asList(
             new BoundDimFilter("dimSequential", "-1", "-1", true, true, null, null, StringComparators.ALPHANUMERIC),
@@ -172,30 +177,7 @@ public class FilteredAggregatorBenchmark
             new InDimFilter("dimSequential", Collections.singletonList("X"), null)
         )
     );
-    filteredMetrics = new AggregatorFactory[1];
-    filteredMetrics[0] = new FilteredAggregatorFactory(new CountAggregatorFactory("rows"), filter);
-    incIndexFilteredAgg = makeIncIndex(filteredMetrics);
-
-    inputRows = new ArrayList<>();
-    for (int j = 0; j < rowsPerSegment; j++) {
-      InputRow row = gen.nextRow();
-      if (j % 10000 == 0) {
-        log.info(j + " rows generated.");
-      }
-      incIndex.add(row);
-      inputRows.add(row);
-    }
-
-    tmpDir = FileUtils.createTempDir();
-    log.info("Using temp dir: " + tmpDir.getAbsolutePath());
-
-    indexFile = INDEX_MERGER_V9.persist(
-        incIndex,
-        tmpDir,
-        new IndexSpec(),
-        null
-    );
-    qIndex = INDEX_IO.loadIndex(indexFile);
+    filteredMetric = new FilteredAggregatorFactory(new CountAggregatorFactory("rows"), filter);
 
     factory = new TimeseriesQueryRunnerFactory(
         new TimeseriesQueryQueryToolChest(),
@@ -205,30 +187,127 @@ public class FilteredAggregatorBenchmark
 
     GeneratorSchemaInfo basicSchema = GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
     QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval()));
-    List<AggregatorFactory> queryAggs = new ArrayList<>();
-    queryAggs.add(filteredMetrics[0]);
+    List<AggregatorFactory> queryAggs = Collections.singletonList(filteredMetric);
 
     query = Druids.newTimeseriesQueryBuilder()
                   .dataSource("blah")
                   .granularity(Granularities.ALL)
                   .intervals(intervalSpec)
                   .aggregators(queryAggs)
-                  .descending(false)
+                  .descending(descending)
                   .build();
   }
 
-  @TearDown
-  public void tearDown() throws IOException
+  /**
+   * Setup/teardown everything specific for benchmarking the incremental-index.
+   */
+  @State(Scope.Benchmark)
+  public static class IncrementalIndexState
+  {
+    @Param({"onheap", "offheap"})
+    private String indexType;
+
+    IncrementalIndex<?> incIndex;
+
+    @Setup
+    public void setup(FilteredAggregatorBenchmark global) throws JsonProcessingException
+    {
+      // Creates an AppendableIndexSpec that corresponds to the indexType parametrization.
+      // It is used in {@code global.makeIncIndex()} to instanciate an incremental-index of the specified type.
+      global.appendableIndexSpec = IncrementalIndexCreator.parseIndexType(indexType);
+      incIndex = global.makeIncIndex(global.schemaInfo.getAggsArray());
+      global.generator.addToIndex(incIndex, global.rowsPerSegment);
+    }
+
+    @TearDown
+    public void tearDown()
+    {
+      if (incIndex != null) {
+        incIndex.close();
+      }
+    }
+  }
+
+  /**
+   * Setup/teardown everything specific for benchmarking the ingestion of the incremental-index.
+   */
+  @State(Scope.Benchmark)
+  public static class IncrementalIndexIngestState
+  {
+    @Param({"onheap", "offheap"})
+    private String indexType;
+
+    IncrementalIndex<?> incIndex;
+    List<InputRow> inputRows;
+
+    @Setup(Level.Invocation)
+    public void setup(FilteredAggregatorBenchmark global) throws JsonProcessingException
+    {
+      // Creates an AppendableIndexSpec that corresponds to the indexType parametrization.
+      // It is used in {@code global.makeIncIndex()} to instanciate an incremental-index of the specified type.
+      global.appendableIndexSpec = IncrementalIndexCreator.parseIndexType(indexType);
+      inputRows = global.generator.toList(global.rowsPerSegment);
+      incIndex = global.makeIncIndex(new AggregatorFactory[]{global.filteredMetric});
+    }
+
+    @TearDown(Level.Invocation)
+    public void tearDown()
+    {
+      if (incIndex != null) {
+        incIndex.close();
+      }
+    }
+  }
+
+  /**
+   * Setup/teardown everything specific for benchmarking the queriable-index.
+   */
+  @State(Scope.Benchmark)
+  public static class QueryableIndexState
   {
-    FileUtils.deleteDirectory(tmpDir);
+    private File qIndexesDir;
+    private QueryableIndex qIndex;
+
+    @Setup
+    public void setup(FilteredAggregatorBenchmark global) throws IOException
+    {
+      global.appendableIndexSpec = new OnheapIncrementalIndex.Spec();
+
+      IncrementalIndex<?> incIndex = global.makeIncIndex(global.schemaInfo.getAggsArray());
+      global.generator.addToIndex(incIndex, global.rowsPerSegment);
+
+      qIndexesDir = FileUtils.createTempDir();
+      log.info("Using temp dir: " + qIndexesDir.getAbsolutePath());
+
+      File indexFile = INDEX_MERGER_V9.persist(
+          incIndex,
+          qIndexesDir,
+          new IndexSpec(),
+          null
+      );
+      incIndex.close();
+
+      qIndex = INDEX_IO.loadIndex(indexFile);
+    }
+
+    @TearDown
+    public void tearDown()
+    {
+      if (qIndex != null) {
+        qIndex.close();
+      }
+      if (qIndexesDir != null) {
+        qIndexesDir.delete();
+      }
+    }
   }
 
-  private IncrementalIndex makeIncIndex(AggregatorFactory[] metrics)
+  private IncrementalIndex<?> makeIncIndex(AggregatorFactory[] metrics)
   {
-    return new IncrementalIndex.Builder()
+    return appendableIndexSpec.builder()
         .setSimpleTestingIndexSchema(metrics)
         .setMaxRowCount(rowsPerSegment)
-        .buildOnheap();
+        .build();
   }
 
   private static <T> List<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query, String vectorize)
@@ -254,11 +333,10 @@ public class FilteredAggregatorBenchmark
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
-  public void ingest(Blackhole blackhole) throws Exception
+  public void ingest(Blackhole blackhole, IncrementalIndexIngestState state) throws Exception
   {
-    incIndexFilteredAgg = makeIncIndex(filteredMetrics);
-    for (InputRow row : inputRows) {
-      int rv = incIndexFilteredAgg.add(row).getRowCount();
+    for (InputRow row : state.inputRows) {
+      int rv = state.incIndex.add(row).getRowCount();
       blackhole.consume(rv);
     }
   }
@@ -266,12 +344,12 @@ public class FilteredAggregatorBenchmark
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
-  public void querySingleIncrementalIndex(Blackhole blackhole)
+  public void querySingleIncrementalIndex(Blackhole blackhole, IncrementalIndexState state)
   {
     QueryRunner<Result<TimeseriesResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
         factory,
         SegmentId.dummy("incIndex"),
-        new IncrementalIndexSegment(incIndex, SegmentId.dummy("incIndex"))
+        new IncrementalIndexSegment(state.incIndex, SegmentId.dummy("incIndex"))
     );
 
     List<Result<TimeseriesResultValue>> results = FilteredAggregatorBenchmark.runQuery(
@@ -288,12 +366,12 @@ public class FilteredAggregatorBenchmark
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
-  public void querySingleQueryableIndex(Blackhole blackhole)
+  public void querySingleQueryableIndex(Blackhole blackhole, QueryableIndexState state)
   {
     final QueryRunner<Result<TimeseriesResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
         factory,
         SegmentId.dummy("qIndex"),
-        new QueryableIndexSegment(qIndex, SegmentId.dummy("qIndex"))
+        new QueryableIndexSegment(state.qIndex, SegmentId.dummy("qIndex"))
     );
 
     List<Result<TimeseriesResultValue>> results = FilteredAggregatorBenchmark.runQuery(
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
index 1921b43..19fe385 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
@@ -70,6 +70,7 @@ import org.apache.druid.segment.generator.DataGenerator;
 import org.apache.druid.segment.generator.GeneratorBasicSchemas;
 import org.apache.druid.segment.generator.GeneratorSchemaInfo;
 import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.serde.ComplexMetrics;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.timeline.SegmentId;
@@ -413,11 +414,11 @@ public class GroupByTypeInterfaceBenchmark
 
   private IncrementalIndex makeIncIndex()
   {
-    return new IncrementalIndex.Builder()
+    return new OnheapIncrementalIndex.Builder()
         .setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
         .setConcurrentEventAdd(true)
         .setMaxRowCount(rowsPerSegment)
-        .buildOnheap();
+        .build();
   }
 
   @TearDown(Level.Trial)
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/IncrementalIndexRowTypeBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/IncrementalIndexRowTypeBenchmark.java
index cc8d4a3..572d0cb 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/IncrementalIndexRowTypeBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/IncrementalIndexRowTypeBenchmark.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.benchmark;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.data.input.InputRow;
@@ -28,13 +29,15 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexCreator;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Level;
 import org.openjdk.jmh.annotations.Mode;
-import org.openjdk.jmh.annotations.OperationsPerInvocation;
 import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
 import org.openjdk.jmh.annotations.Scope;
 import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.annotations.State;
@@ -53,12 +56,16 @@ public class IncrementalIndexRowTypeBenchmark
     NullHandling.initializeForTests();
   }
 
-  private IncrementalIndex incIndex;
-  private IncrementalIndex incFloatIndex;
-  private IncrementalIndex incStrIndex;
+  @Param({"250000"})
+  private int rowsPerSegment;
+
+  @Param({"onheap", "offheap"})
+  private String indexType;
+
+  private AppendableIndexSpec appendableIndexSpec;
+  IncrementalIndex<?> incIndex;
   private static AggregatorFactory[] aggs;
   static final int DIMENSION_COUNT = 8;
-  static final int MAX_ROWS = 250000;
 
   private ArrayList<InputRow> longRows = new ArrayList<InputRow>();
   private ArrayList<InputRow> floatRows = new ArrayList<InputRow>();
@@ -124,46 +131,51 @@ public class IncrementalIndexRowTypeBenchmark
     return new MapBasedInputRow(timestamp, dimensionList, builder.build());
   }
 
-  private IncrementalIndex makeIncIndex()
+  private IncrementalIndex<?> makeIncIndex()
   {
-    return new IncrementalIndex.Builder()
+    return appendableIndexSpec.builder()
         .setSimpleTestingIndexSchema(aggs)
         .setDeserializeComplexMetrics(false)
-        .setMaxRowCount(MAX_ROWS)
-        .buildOnheap();
+        .setMaxRowCount(rowsPerSegment)
+        .build();
   }
 
   @Setup
-  public void setup()
+  public void setup() throws JsonProcessingException
   {
-    for (int i = 0; i < MAX_ROWS; i++) {
+    appendableIndexSpec = IncrementalIndexCreator.parseIndexType(indexType);
+
+    for (int i = 0; i < rowsPerSegment; i++) {
       longRows.add(getLongRow(0, DIMENSION_COUNT));
     }
 
-    for (int i = 0; i < MAX_ROWS; i++) {
+    for (int i = 0; i < rowsPerSegment; i++) {
       floatRows.add(getFloatRow(0, DIMENSION_COUNT));
     }
 
-    for (int i = 0; i < MAX_ROWS; i++) {
+    for (int i = 0; i < rowsPerSegment; i++) {
       stringRows.add(getStringRow(0, DIMENSION_COUNT));
     }
   }
 
-  @Setup(Level.Iteration)
+  @Setup(Level.Invocation)
   public void setup2()
   {
     incIndex = makeIncIndex();
-    incFloatIndex = makeIncIndex();
-    incStrIndex = makeIncIndex();
+  }
+
+  @Setup(Level.Invocation)
+  public void tearDown()
+  {
+    incIndex.close();
   }
 
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
-  @OperationsPerInvocation(MAX_ROWS)
   public void normalLongs(Blackhole blackhole) throws Exception
   {
-    for (int i = 0; i < MAX_ROWS; i++) {
+    for (int i = 0; i < rowsPerSegment; i++) {
       InputRow row = longRows.get(i);
       int rv = incIndex.add(row).getRowCount();
       blackhole.consume(rv);
@@ -173,12 +185,11 @@ public class IncrementalIndexRowTypeBenchmark
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
-  @OperationsPerInvocation(MAX_ROWS)
   public void normalFloats(Blackhole blackhole) throws Exception
   {
-    for (int i = 0; i < MAX_ROWS; i++) {
+    for (int i = 0; i < rowsPerSegment; i++) {
       InputRow row = floatRows.get(i);
-      int rv = incFloatIndex.add(row).getRowCount();
+      int rv = incIndex.add(row).getRowCount();
       blackhole.consume(rv);
     }
   }
@@ -186,12 +197,11 @@ public class IncrementalIndexRowTypeBenchmark
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
-  @OperationsPerInvocation(MAX_ROWS)
   public void normalStrings(Blackhole blackhole) throws Exception
   {
-    for (int i = 0; i < MAX_ROWS; i++) {
+    for (int i = 0; i < rowsPerSegment; i++) {
       InputRow row = stringRows.get(i);
-      int rv = incStrIndex.add(row).getRowCount();
+      int rv = incIndex.add(row).getRowCount();
       blackhole.consume(rv);
     }
   }
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java
index fabb86a..b6078ae 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java
@@ -67,6 +67,7 @@ import org.apache.druid.segment.generator.DataGenerator;
 import org.apache.druid.segment.generator.GeneratorBasicSchemas;
 import org.apache.druid.segment.generator.GeneratorSchemaInfo;
 import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.serde.ComplexMetrics;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.timeline.SegmentId;
@@ -308,10 +309,10 @@ public class TopNTypeInterfaceBenchmark
 
   private IncrementalIndex makeIncIndex()
   {
-    return new IncrementalIndex.Builder()
+    return new OnheapIncrementalIndex.Builder()
         .setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
         .setMaxRowCount(rowsPerSegment)
-        .buildOnheap();
+        .build();
   }
 
   private static <T> List<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query)
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java
index a49e122..c07c34f 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java
@@ -20,7 +20,6 @@
 package org.apache.druid.benchmark.indexing;
 
 import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.data.input.InputRow;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -43,7 +42,9 @@ import org.apache.druid.segment.data.IndexedInts;
 import org.apache.druid.segment.generator.DataGenerator;
 import org.apache.druid.segment.generator.GeneratorBasicSchemas;
 import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexCreator;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
 import org.apache.druid.segment.serde.ComplexMetrics;
@@ -57,6 +58,7 @@ import org.openjdk.jmh.annotations.Param;
 import org.openjdk.jmh.annotations.Scope;
 import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
 import org.openjdk.jmh.annotations.Warmup;
 import org.openjdk.jmh.infra.Blackhole;
 
@@ -82,6 +84,9 @@ public class IncrementalIndexReadBenchmark
   @Param({"true", "false"})
   private boolean rollup;
 
+  @Param({"onheap", "offheap"})
+  private String indexType;
+
   private static final Logger log = new Logger(IncrementalIndexReadBenchmark.class);
   private static final int RNG_SEED = 9999;
 
@@ -89,8 +94,8 @@ public class IncrementalIndexReadBenchmark
     NullHandling.initializeForTests();
   }
 
-  private IncrementalIndex incIndex;
-
+  private AppendableIndexSpec appendableIndexSpec;
+  private IncrementalIndex<?> incIndex;
   private GeneratorSchemaInfo schemaInfo;
 
   @Setup
@@ -102,6 +107,10 @@ public class IncrementalIndexReadBenchmark
 
     schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schema);
 
+    // Creates an AppendableIndexSpec that corresponds to the indexType parametrization.
+    // It is used in {@code makeIncIndex()} to instanciate an incremental-index of the specified type.
+    appendableIndexSpec = IncrementalIndexCreator.parseIndexType(indexType);
+
     DataGenerator gen = new DataGenerator(
         schemaInfo.getColumnSchemas(),
         RNG_SEED,
@@ -110,20 +119,20 @@ public class IncrementalIndexReadBenchmark
     );
 
     incIndex = makeIncIndex();
+    gen.addToIndex(incIndex, rowsPerSegment);
+  }
 
-    for (int j = 0; j < rowsPerSegment; j++) {
-      InputRow row = gen.nextRow();
-      if (j % 10000 == 0) {
-        log.info(j + " rows generated.");
-      }
-      incIndex.add(row);
+  @TearDown
+  public void tearDown()
+  {
+    if (incIndex != null) {
+      incIndex.close();
     }
-
   }
 
-  private IncrementalIndex makeIncIndex()
+  private IncrementalIndex<?> makeIncIndex()
   {
-    return new IncrementalIndex.Builder()
+    return appendableIndexSpec.builder()
         .setIndexSchema(
             new IncrementalIndexSchema.Builder()
                 .withMetrics(schemaInfo.getAggsArray())
@@ -131,7 +140,7 @@ public class IncrementalIndexReadBenchmark
                 .build()
         )
         .setMaxRowCount(rowsPerSegment)
-        .buildOnheap();
+        .build();
   }
 
   @Benchmark
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexIngestionBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexIngestionBenchmark.java
index 8448358..33819ef 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexIngestionBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexIngestionBenchmark.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.benchmark.indexing;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -26,7 +27,9 @@ import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
 import org.apache.druid.segment.generator.DataGenerator;
 import org.apache.druid.segment.generator.GeneratorBasicSchemas;
 import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexCreator;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.serde.ComplexMetrics;
 import org.openjdk.jmh.annotations.Benchmark;
@@ -40,10 +43,11 @@ import org.openjdk.jmh.annotations.Param;
 import org.openjdk.jmh.annotations.Scope;
 import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
 import org.openjdk.jmh.annotations.Warmup;
 import org.openjdk.jmh.infra.Blackhole;
 
-import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 @State(Scope.Benchmark)
@@ -61,6 +65,12 @@ public class IndexIngestionBenchmark
   @Param({"true", "false"})
   private boolean rollup;
 
+  @Param({"none", "moderate", "high"})
+  private String rollupOpportunity;
+
+  @Param({"onheap", "offheap"})
+  private String indexType;
+
   private static final Logger log = new Logger(IndexIngestionBenchmark.class);
   private static final int RNG_SEED = 9999;
 
@@ -68,32 +78,31 @@ public class IndexIngestionBenchmark
     NullHandling.initializeForTests();
   }
 
-  private IncrementalIndex incIndex;
-  private ArrayList<InputRow> rows;
+  private AppendableIndexSpec appendableIndexSpec;
+  private IncrementalIndex<?> incIndex;
+  private List<InputRow> rows;
   private GeneratorSchemaInfo schemaInfo;
 
   @Setup
-  public void setup()
+  public void setup() throws JsonProcessingException
   {
     ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
 
-    rows = new ArrayList<InputRow>();
     schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schema);
 
+    // Creates an AppendableIndexSpec that corresponds to the indexType parametrization.
+    // It is used in {@code makeIncIndex()} to instanciate an incremental-index of the specified type.
+    appendableIndexSpec = IncrementalIndexCreator.parseIndexType(indexType);
+
     DataGenerator gen = new DataGenerator(
         schemaInfo.getColumnSchemas(),
         RNG_SEED,
-        schemaInfo.getDataInterval(),
-        rowsPerSegment
+        schemaInfo.getDataInterval().getStartMillis(),
+        IndexPersistBenchmark.getValuesPerTimestamp(rollupOpportunity),
+        1000.0
     );
 
-    for (int i = 0; i < rowsPerSegment; i++) {
-      InputRow row = gen.nextRow();
-      if (i % 10000 == 0) {
-        log.info(i + " rows generated.");
-      }
-      rows.add(row);
-    }
+    rows = gen.toList(rowsPerSegment);
   }
 
   @Setup(Level.Invocation)
@@ -102,9 +111,17 @@ public class IndexIngestionBenchmark
     incIndex = makeIncIndex();
   }
 
-  private IncrementalIndex makeIncIndex()
+  @TearDown(Level.Invocation)
+  public void tearDown()
+  {
+    if (incIndex != null) {
+      incIndex.close();
+    }
+  }
+
+  private IncrementalIndex<?> makeIncIndex()
   {
-    return new IncrementalIndex.Builder()
+    return appendableIndexSpec.builder()
         .setIndexSchema(
             new IncrementalIndexSchema.Builder()
                 .withMetrics(schemaInfo.getAggsArray())
@@ -112,7 +129,7 @@ public class IndexIngestionBenchmark
                 .build()
         )
         .setMaxRowCount(rowsPerSegment * 2)
-        .buildOnheap();
+        .build();
   }
 
   @Benchmark
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java
index ce34070..44be3cc 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java
@@ -22,7 +22,6 @@ package org.apache.druid.benchmark.indexing;
 import com.fasterxml.jackson.databind.InjectableValues;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.data.input.InputRow;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -37,6 +36,7 @@ import org.apache.druid.segment.generator.GeneratorBasicSchemas;
 import org.apache.druid.segment.generator.GeneratorSchemaInfo;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.serde.ComplexMetrics;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory;
@@ -131,15 +131,9 @@ public class IndexMergeBenchmark
           rowsPerSegment
       );
 
-      IncrementalIndex incIndex = makeIncIndex();
+      IncrementalIndex<?> incIndex = makeIncIndex();
 
-      for (int j = 0; j < rowsPerSegment; j++) {
-        InputRow row = gen.nextRow();
-        if (j % 10000 == 0) {
-          log.info(j + " rows generated.");
-        }
-        incIndex.add(row);
-      }
+      gen.addToIndex(incIndex, rowsPerSegment);
 
       tmpDir = FileUtils.createTempDir();
       log.info("Using temp dir: " + tmpDir.getAbsolutePath());
@@ -212,7 +206,7 @@ public class IndexMergeBenchmark
 
   private IncrementalIndex makeIncIndex()
   {
-    return new IncrementalIndex.Builder()
+    return new OnheapIncrementalIndex.Builder()
         .setIndexSchema(
             new IncrementalIndexSchema.Builder()
                 .withMetrics(schemaInfo.getAggsArray())
@@ -220,6 +214,6 @@ public class IndexMergeBenchmark
                 .build()
         )
         .setMaxRowCount(rowsPerSegment)
-        .buildOnheap();
+        .build();
   }
 }
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexPersistBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexPersistBenchmark.java
index 755947d..3679ada 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexPersistBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexPersistBenchmark.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.benchmark.indexing;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.data.input.InputRow;
@@ -32,7 +33,9 @@ import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.generator.DataGenerator;
 import org.apache.druid.segment.generator.GeneratorBasicSchemas;
 import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexCreator;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.serde.ComplexMetrics;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
@@ -53,7 +56,7 @@ import org.openjdk.jmh.infra.Blackhole;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 @State(Scope.Benchmark)
@@ -90,68 +93,84 @@ public class IndexPersistBenchmark
   @Param({"none", "moderate", "high"})
   private String rollupOpportunity;
 
-  private IncrementalIndex incIndex;
-  private ArrayList<InputRow> rows;
+  @Param({"onheap", "offheap"})
+  private String indexType;
+
+  private AppendableIndexSpec appendableIndexSpec;
+  private IncrementalIndex<?> incIndex;
+  private List<InputRow> rows;
   private GeneratorSchemaInfo schemaInfo;
+  private File tmpDir;
 
   @Setup
-  public void setup()
+  public void setup() throws JsonProcessingException
   {
     log.info("SETUP CALLED AT " + System.currentTimeMillis());
 
     ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
 
-    rows = new ArrayList<InputRow>();
     schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schema);
 
-    int valuesPerTimestamp = 1;
-    switch (rollupOpportunity) {
-      case "moderate":
-        valuesPerTimestamp = 1000;
-        break;
-      case "high":
-        valuesPerTimestamp = 10000;
-        break;
-
-    }
+    // Creates an AppendableIndexSpec that corresponds to the indexType parametrization.
+    // It is used in {@code makeIncIndex()} to instanciate an incremental-index of the specified type.
+    appendableIndexSpec = IncrementalIndexCreator.parseIndexType(indexType);
 
     DataGenerator gen = new DataGenerator(
         schemaInfo.getColumnSchemas(),
         RNG_SEED,
         schemaInfo.getDataInterval().getStartMillis(),
-        valuesPerTimestamp,
+        getValuesPerTimestamp(rollupOpportunity),
         1000.0
     );
 
-    for (int i = 0; i < rowsPerSegment; i++) {
-      InputRow row = gen.nextRow();
-      if (i % 10000 == 0) {
-        log.info(i + " rows generated.");
-      }
-      rows.add(row);
+    rows = gen.toList(rowsPerSegment);
+  }
+
+  public static int getValuesPerTimestamp(String rollupOpportunity)
+  {
+    switch (rollupOpportunity) {
+      case "moderate":
+        return 1000;
+      case "high":
+        return 10000;
+      case "none":
+        return 1;
+      default:
+        throw new IllegalArgumentException("Rollup opportunity must be moderate, high or none.");
     }
   }
 
   @Setup(Level.Iteration)
-  public void setup2() throws IOException
+  public void setup2()
   {
     incIndex = makeIncIndex();
-    for (int i = 0; i < rowsPerSegment; i++) {
-      InputRow row = rows.get(i);
-      incIndex.add(row);
-    }
+    DataGenerator.addStreamToIndex(rows.stream(), incIndex);
   }
 
   @TearDown(Level.Iteration)
   public void teardown()
   {
-    incIndex.close();
-    incIndex = null;
+    if (incIndex != null) {
+      incIndex.close();
+    }
   }
 
-  private IncrementalIndex makeIncIndex()
+  @Setup(Level.Invocation)
+  public void setupTemp()
   {
-    return new IncrementalIndex.Builder()
+    tmpDir = FileUtils.createTempDir();
+    log.info("Using temp dir: " + tmpDir.getAbsolutePath());
+  }
+
+  @TearDown(Level.Invocation)
+  public void teardownTemp() throws IOException
+  {
+    FileUtils.deleteDirectory(tmpDir);
+  }
+
+  private IncrementalIndex<?> makeIncIndex()
+  {
+    return appendableIndexSpec.builder()
         .setIndexSchema(
             new IncrementalIndexSchema.Builder()
                 .withMetrics(schemaInfo.getAggsArray())
@@ -159,7 +178,7 @@ public class IndexPersistBenchmark
                 .build()
         )
         .setMaxRowCount(rowsPerSegment)
-        .buildOnheap();
+        .build();
   }
 
   @Benchmark
@@ -167,21 +186,13 @@ public class IndexPersistBenchmark
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
   public void persistV9(Blackhole blackhole) throws Exception
   {
-    File tmpDir = FileUtils.createTempDir();
-    log.info("Using temp dir: " + tmpDir.getAbsolutePath());
-    try {
-      File indexFile = INDEX_MERGER_V9.persist(
-          incIndex,
-          tmpDir,
-          new IndexSpec(),
-          null
-      );
-
-      blackhole.consume(indexFile);
+    File indexFile = INDEX_MERGER_V9.persist(
+        incIndex,
+        tmpDir,
+        new IndexSpec(),
+        null
+    );
 
-    }
-    finally {
-      FileUtils.deleteDirectory(tmpDir);
-    }
+    blackhole.consume(indexFile);
   }
 }
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
index 1e93d16..b9808c9 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.benchmark.query;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.InjectableValues;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.smile.SmileFactory;
@@ -31,7 +32,6 @@ import org.apache.druid.collections.DefaultBlockingPool;
 import org.apache.druid.collections.NonBlockingPool;
 import org.apache.druid.collections.StupidPool;
 import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.data.input.InputRow;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
@@ -85,8 +85,11 @@ import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.generator.DataGenerator;
 import org.apache.druid.segment.generator.GeneratorBasicSchemas;
 import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexCreator;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.serde.ComplexMetrics;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.timeline.SegmentId;
@@ -122,9 +125,6 @@ import java.util.concurrent.TimeUnit;
 @Measurement(iterations = 25)
 public class GroupByBenchmark
 {
-  @Param({"4"})
-  private int numSegments;
-
   @Param({"2", "4"})
   private int numProcessingThreads;
 
@@ -156,17 +156,12 @@ public class GroupByBenchmark
     NullHandling.initializeForTests();
   }
 
-  private File tmpDir;
-  private IncrementalIndex anIncrementalIndex;
-  private List<QueryableIndex> queryableIndexes;
-
+  private AppendableIndexSpec appendableIndexSpec;
+  private DataGenerator generator;
   private QueryRunnerFactory<ResultRow, GroupByQuery> factory;
-
   private GeneratorSchemaInfo schemaInfo;
   private GroupByQuery query;
 
-  private ExecutorService executorService;
-
   static {
     JSON_MAPPER = new DefaultObjectMapper();
     INDEX_IO = new IndexIO(
@@ -429,15 +424,16 @@ public class GroupByBenchmark
     SCHEMA_QUERY_MAP.put("nulls", nullQueries);
   }
 
+  /**
+   * Setup everything common for benchmarking both the incremental-index and the queriable-index.
+   */
   @Setup(Level.Trial)
-  public void setup() throws IOException
+  public void setup()
   {
     log.info("SETUP CALLED AT " + +System.currentTimeMillis());
 
     ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
 
-    executorService = Execs.multiThreaded(numProcessingThreads, "GroupByThreadPool[%d]");
-
     setupQueries();
 
     String[] schemaQuery = schemaAndQuery.split("\\.");
@@ -447,58 +443,13 @@ public class GroupByBenchmark
     schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schemaName);
     query = SCHEMA_QUERY_MAP.get(schemaName).get(queryName);
 
-    final DataGenerator dataGenerator = new DataGenerator(
+    generator = new DataGenerator(
         schemaInfo.getColumnSchemas(),
-        RNG_SEED + 1,
+        RNG_SEED,
         schemaInfo.getDataInterval(),
         rowsPerSegment
     );
 
-    tmpDir = FileUtils.createTempDir();
-    log.info("Using temp dir: %s", tmpDir.getAbsolutePath());
-
-    // queryableIndexes   -> numSegments worth of on-disk segments
-    // anIncrementalIndex -> the last incremental index
-    anIncrementalIndex = null;
-    queryableIndexes = new ArrayList<>(numSegments);
-
-    for (int i = 0; i < numSegments; i++) {
-      log.info("Generating rows for segment %d/%d", i + 1, numSegments);
-
-      final IncrementalIndex index = makeIncIndex(schemaInfo.isWithRollup());
-
-      for (int j = 0; j < rowsPerSegment; j++) {
-        final InputRow row = dataGenerator.nextRow();
-        if (j % 20000 == 0) {
-          log.info("%,d/%,d rows generated.", i * rowsPerSegment + j, rowsPerSegment * numSegments);
-        }
-        index.add(row);
-      }
-
-      log.info(
-          "%,d/%,d rows generated, persisting segment %d/%d.",
-          (i + 1) * rowsPerSegment,
-          rowsPerSegment * numSegments,
-          i + 1,
-          numSegments
-      );
-
-      final File file = INDEX_MERGER_V9.persist(
-          index,
-          new File(tmpDir, String.valueOf(i)),
-          new IndexSpec(),
-          null
-      );
-
-      queryableIndexes.add(INDEX_IO.loadIndex(file));
-
-      if (i == numSegments - 1) {
-        anIncrementalIndex = index;
-      } else {
-        index.close();
-      }
-    }
-
     NonBlockingPool<ByteBuffer> bufferPool = new StupidPool<>(
         "GroupByBenchmark-computeBufferPool",
         new OffheapBufferGenerator("compute", 250_000_000),
@@ -576,43 +527,114 @@ public class GroupByBenchmark
     );
   }
 
-  private IncrementalIndex makeIncIndex(boolean withRollup)
+  /**
+   * Setup/teardown everything specific for benchmarking the incremental-index.
+   */
+  @State(Scope.Benchmark)
+  public static class IncrementalIndexState
   {
-    return new IncrementalIndex.Builder()
-        .setIndexSchema(
-            new IncrementalIndexSchema.Builder()
-                .withDimensionsSpec(schemaInfo.getDimensionsSpec())
-                .withMetrics(schemaInfo.getAggsArray())
-                .withRollup(withRollup)
-                .build()
-        )
-        .setConcurrentEventAdd(true)
-        .setMaxRowCount(rowsPerSegment)
-        .buildOnheap();
+    @Param({"onheap", "offheap"})
+    private String indexType;
+
+    IncrementalIndex<?> incIndex;
+
+    @Setup(Level.Trial)
+    public void setup(GroupByBenchmark global) throws JsonProcessingException
+    {
+      // Creates an AppendableIndexSpec that corresponds to the indexType parametrization.
+      // It is used in {@code global.makeIncIndex()} to instanciate an incremental-index of the specified type.
+      global.appendableIndexSpec = IncrementalIndexCreator.parseIndexType(indexType);
+      incIndex = global.makeIncIndex(global.schemaInfo.isWithRollup());
+      global.generator.addToIndex(incIndex, global.rowsPerSegment);
+    }
+
+    @TearDown(Level.Trial)
+    public void tearDown()
+    {
+      if (incIndex != null) {
+        incIndex.close();
+      }
+    }
   }
 
-  @TearDown(Level.Trial)
-  public void tearDown()
+  /**
+   * Setup/teardown everything specific for benchmarking the queriable-index.
+   */
+  @State(Scope.Benchmark)
+  public static class QueryableIndexState
   {
-    try {
-      if (anIncrementalIndex != null) {
-        anIncrementalIndex.close();
+    @Param({"4"})
+    private int numSegments;
+
+    private ExecutorService executorService;
+    private File qIndexesDir;
+    private List<QueryableIndex> queryableIndexes;
+
+    @Setup(Level.Trial)
+    public void setup(GroupByBenchmark global) throws IOException
+    {
+      global.appendableIndexSpec = new OnheapIncrementalIndex.Spec();
+
+      executorService = Execs.multiThreaded(global.numProcessingThreads, "GroupByThreadPool[%d]");
+
+      qIndexesDir = FileUtils.createTempDir();
+
+      // numSegments worth of on-disk segments
+      queryableIndexes = new ArrayList<>();
+
+      for (int i = 0; i < numSegments; i++) {
+        log.info("Generating rows for segment %d/%d", i + 1, numSegments);
+
+        final IncrementalIndex<?> incIndex = global.makeIncIndex(global.schemaInfo.isWithRollup());
+        global.generator.reset(RNG_SEED + i).addToIndex(incIndex, global.rowsPerSegment);
+
+        log.info(
+            "%,d/%,d rows generated, persisting segment %d/%d.",
+            (i + 1) * global.rowsPerSegment,
+            global.rowsPerSegment * numSegments,
+            i + 1,
+            numSegments
+        );
+
+        File indexFile = INDEX_MERGER_V9.persist(
+            incIndex,
+            new File(qIndexesDir, String.valueOf(i)),
+            new IndexSpec(),
+            null
+        );
+        incIndex.close();
+
+        queryableIndexes.add(INDEX_IO.loadIndex(indexFile));
       }
+    }
 
-      if (queryableIndexes != null) {
-        for (QueryableIndex index : queryableIndexes) {
+    @TearDown(Level.Trial)
+    public void tearDown()
+    {
+      for (QueryableIndex index : queryableIndexes) {
+        if (index != null) {
           index.close();
         }
       }
-
-      if (tmpDir != null) {
-        FileUtils.deleteDirectory(tmpDir);
+      if (qIndexesDir != null) {
+        qIndexesDir.delete();
       }
     }
-    catch (IOException e) {
-      log.warn(e, "Failed to tear down, temp dir was: %s", tmpDir);
-      throw new RuntimeException(e);
-    }
+  }
+
+  private IncrementalIndex<?> makeIncIndex(boolean withRollup)
+  {
+    return appendableIndexSpec.builder()
+        .setIndexSchema(
+            new IncrementalIndexSchema.Builder()
+                .withDimensionsSpec(schemaInfo.getDimensionsSpec())
+                .withMetrics(schemaInfo.getAggsArray())
+                .withRollup(withRollup)
+                .build()
+        )
+        .setConcurrentEventAdd(true)
+        .setMaxRowCount(rowsPerSegment)
+        .build();
   }
 
   private static <T> Sequence<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query)
@@ -629,12 +651,12 @@ public class GroupByBenchmark
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
-  public void querySingleIncrementalIndex(Blackhole blackhole)
+  public void querySingleIncrementalIndex(Blackhole blackhole, IncrementalIndexState state)
   {
     QueryRunner<ResultRow> runner = QueryBenchmarkUtil.makeQueryRunner(
         factory,
         SegmentId.dummy("incIndex"),
-        new IncrementalIndexSegment(anIncrementalIndex, SegmentId.dummy("incIndex"))
+        new IncrementalIndexSegment(state.incIndex, SegmentId.dummy("incIndex"))
     );
 
     final Sequence<ResultRow> results = GroupByBenchmark.runQuery(factory, runner, query);
@@ -649,12 +671,12 @@ public class GroupByBenchmark
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
-  public void querySingleQueryableIndex(Blackhole blackhole)
+  public void querySingleQueryableIndex(Blackhole blackhole, QueryableIndexState state)
   {
     QueryRunner<ResultRow> runner = QueryBenchmarkUtil.makeQueryRunner(
         factory,
         SegmentId.dummy("qIndex"),
-        new QueryableIndexSegment(queryableIndexes.get(0), SegmentId.dummy("qIndex"))
+        new QueryableIndexSegment(state.queryableIndexes.get(0), SegmentId.dummy("qIndex"))
     );
 
     final Sequence<ResultRow> results = GroupByBenchmark.runQuery(factory, runner, query);
@@ -669,12 +691,12 @@ public class GroupByBenchmark
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
-  public void queryMultiQueryableIndexX(Blackhole blackhole)
+  public void queryMultiQueryableIndexX(Blackhole blackhole, QueryableIndexState state)
   {
     QueryToolChest<ResultRow, GroupByQuery> toolChest = factory.getToolchest();
     QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<>(
         toolChest.mergeResults(
-            factory.mergeRunners(executorService, makeMultiRunners())
+            factory.mergeRunners(state.executorService, makeMultiRunners(state))
         ),
         (QueryToolChest) toolChest
     );
@@ -691,12 +713,12 @@ public class GroupByBenchmark
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
-  public void queryMultiQueryableIndexTTFR(Blackhole blackhole) throws IOException
+  public void queryMultiQueryableIndexTTFR(Blackhole blackhole, QueryableIndexState state) throws IOException
   {
     QueryToolChest<ResultRow, GroupByQuery> toolChest = factory.getToolchest();
     QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<>(
         toolChest.mergeResults(
-            factory.mergeRunners(executorService, makeMultiRunners())
+            factory.mergeRunners(state.executorService, makeMultiRunners(state))
         ),
         (QueryToolChest) toolChest
     );
@@ -709,12 +731,12 @@ public class GroupByBenchmark
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
-  public void queryMultiQueryableIndexWithSpilling(Blackhole blackhole)
+  public void queryMultiQueryableIndexWithSpilling(Blackhole blackhole, QueryableIndexState state)
   {
     QueryToolChest<ResultRow, GroupByQuery> toolChest = factory.getToolchest();
     QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<>(
         toolChest.mergeResults(
-            factory.mergeRunners(executorService, makeMultiRunners())
+            factory.mergeRunners(state.executorService, makeMultiRunners(state))
         ),
         (QueryToolChest) toolChest
     );
@@ -735,12 +757,12 @@ public class GroupByBenchmark
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
-  public void queryMultiQueryableIndexWithSpillingTTFR(Blackhole blackhole) throws IOException
+  public void queryMultiQueryableIndexWithSpillingTTFR(Blackhole blackhole, QueryableIndexState state) throws IOException
   {
     QueryToolChest<ResultRow, GroupByQuery> toolChest = factory.getToolchest();
     QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<>(
         toolChest.mergeResults(
-            factory.mergeRunners(executorService, makeMultiRunners())
+            factory.mergeRunners(state.executorService, makeMultiRunners(state))
         ),
         (QueryToolChest) toolChest
     );
@@ -756,7 +778,7 @@ public class GroupByBenchmark
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
-  public void queryMultiQueryableIndexWithSerde(Blackhole blackhole)
+  public void queryMultiQueryableIndexWithSerde(Blackhole blackhole, QueryableIndexState state)
   {
     QueryToolChest<ResultRow, GroupByQuery> toolChest = factory.getToolchest();
     //noinspection unchecked
@@ -766,7 +788,7 @@ public class GroupByBenchmark
                 new DefaultObjectMapper(new SmileFactory()),
                 ResultRow.class,
                 toolChest.mergeResults(
-                    factory.mergeRunners(executorService, makeMultiRunners())
+                    factory.mergeRunners(state.executorService, makeMultiRunners(state))
                 )
             )
         ),
@@ -778,15 +800,15 @@ public class GroupByBenchmark
     blackhole.consume(results);
   }
 
-  private List<QueryRunner<ResultRow>> makeMultiRunners()
+  private List<QueryRunner<ResultRow>> makeMultiRunners(QueryableIndexState state)
   {
     List<QueryRunner<ResultRow>> runners = new ArrayList<>();
-    for (int i = 0; i < numSegments; i++) {
+    for (int i = 0; i < state.numSegments; i++) {
       String segmentName = "qIndex " + i;
       QueryRunner<ResultRow> runner = QueryBenchmarkUtil.makeQueryRunner(
           factory,
           SegmentId.dummy(segmentName),
-          new QueryableIndexSegment(queryableIndexes.get(i), SegmentId.dummy(segmentName))
+          new QueryableIndexSegment(state.queryableIndexes.get(i), SegmentId.dummy(segmentName))
       );
       runners.add(factory.getToolchest().preMergeQueryDecoration(runner));
     }
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/ScanBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/ScanBenchmark.java
index b543076..10c31b5 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/ScanBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/ScanBenchmark.java
@@ -19,11 +19,11 @@
 
 package org.apache.druid.benchmark.query;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.Row;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.FileUtils;
@@ -67,7 +67,10 @@ import org.apache.druid.segment.QueryableIndexSegment;
 import org.apache.druid.segment.generator.DataGenerator;
 import org.apache.druid.segment.generator.GeneratorBasicSchemas;
 import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexCreator;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.serde.ComplexMetrics;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.timeline.SegmentId;
@@ -102,12 +105,6 @@ import java.util.concurrent.TimeUnit;
 @Measurement(iterations = 25)
 public class ScanBenchmark
 {
-  @Param({"2", "4"})
-  private int numSegments;
-
-  @Param({"2"})
-  private int numProcessingThreads;
-
   @Param({"200000"})
   private int rowsPerSegment;
 
@@ -121,6 +118,7 @@ public class ScanBenchmark
   private static ScanQuery.Order ordering;
 
   private static final Logger log = new Logger(ScanBenchmark.class);
+  private static final int RNG_SEED = 9999;
   private static final ObjectMapper JSON_MAPPER;
   private static final IndexMergerV9 INDEX_MERGER_V9;
   private static final IndexIO INDEX_IO;
@@ -129,16 +127,12 @@ public class ScanBenchmark
     NullHandling.initializeForTests();
   }
 
-  private List<IncrementalIndex> incIndexes;
-  private List<QueryableIndex> qIndexes;
-
+  private AppendableIndexSpec appendableIndexSpec;
+  private DataGenerator generator;
   private QueryRunnerFactory factory;
   private GeneratorSchemaInfo schemaInfo;
   private Druids.ScanQueryBuilder queryBuilder;
   private ScanQuery query;
-  private File tmpDir;
-
-  private ExecutorService executorService;
 
   static {
     JSON_MAPPER = new DefaultObjectMapper();
@@ -242,15 +236,16 @@ public class ScanBenchmark
                  .order(ordering);
   }
 
+  /**
+   * Setup everything common for benchmarking both the incremental-index and the queriable-index.
+   */
   @Setup
-  public void setup() throws IOException
+  public void setup()
   {
     log.info("SETUP CALLED AT " + +System.currentTimeMillis());
 
     ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
 
-    executorService = Execs.multiThreaded(numProcessingThreads, "ScanThreadPool");
-
     setupQueries();
 
     String[] schemaQuery = schemaAndQuery.split("\\.");
@@ -262,43 +257,12 @@ public class ScanBenchmark
     queryBuilder.limit(limit);
     query = queryBuilder.build();
 
-    incIndexes = new ArrayList<>();
-    for (int i = 0; i < numSegments; i++) {
-      log.info("Generating rows for segment " + i);
-      DataGenerator gen = new DataGenerator(
-          schemaInfo.getColumnSchemas(),
-          System.currentTimeMillis(),
-          schemaInfo.getDataInterval(),
-          rowsPerSegment
-      );
-
-      IncrementalIndex incIndex = makeIncIndex();
-
-      for (int j = 0; j < rowsPerSegment; j++) {
-        InputRow row = gen.nextRow();
-        if (j % 10000 == 0) {
-          log.info(j + " rows generated.");
-        }
-        incIndex.add(row);
-      }
-      incIndexes.add(incIndex);
-    }
-
-    tmpDir = FileUtils.createTempDir();
-    log.info("Using temp dir: " + tmpDir.getAbsolutePath());
-
-    qIndexes = new ArrayList<>();
-    for (int i = 0; i < numSegments; i++) {
-      File indexFile = INDEX_MERGER_V9.persist(
-          incIndexes.get(i),
-          tmpDir,
-          new IndexSpec(),
-          null
-      );
-
-      QueryableIndex qIndex = INDEX_IO.loadIndex(indexFile);
-      qIndexes.add(qIndex);
-    }
+    generator = new DataGenerator(
+        schemaInfo.getColumnSchemas(),
+        System.currentTimeMillis(),
+        schemaInfo.getDataInterval(),
+        rowsPerSegment
+    );
 
     final ScanQueryConfig config = new ScanQueryConfig().setLegacy(false);
     factory = new ScanQueryRunnerFactory(
@@ -311,18 +275,100 @@ public class ScanBenchmark
     );
   }
 
-  @TearDown
-  public void tearDown() throws IOException
+  /**
+   * Setup/teardown everything specific for benchmarking the incremental-index.
+   */
+  @State(Scope.Benchmark)
+  public static class IncrementalIndexState
   {
-    FileUtils.deleteDirectory(tmpDir);
+    @Param({"onheap", "offheap"})
+    private String indexType;
+
+    IncrementalIndex<?> incIndex;
+
+    @Setup
+    public void setup(ScanBenchmark global) throws JsonProcessingException
+    {
+      // Creates an AppendableIndexSpec that corresponds to the indexType parametrization.
+      // It is used in {@code global.makeIncIndex()} to instanciate an incremental-index of the specified type.
+      global.appendableIndexSpec = IncrementalIndexCreator.parseIndexType(indexType);
+      incIndex = global.makeIncIndex();
+      global.generator.addToIndex(incIndex, global.rowsPerSegment);
+    }
+
+    @TearDown
+    public void tearDown()
+    {
+      if (incIndex != null) {
+        incIndex.close();
+      }
+    }
+  }
+
+  /**
+   * Setup/teardown everything specific for benchmarking the queriable-index.
+   */
+  @State(Scope.Benchmark)
+  public static class QueryableIndexState
+  {
+    @Param({"2", "4"})
+    private int numSegments;
+
+    @Param({"2"})
+    private int numProcessingThreads;
+
+    private ExecutorService executorService;
+    private File qIndexesDir;
+    private List<QueryableIndex> qIndexes;
+
+    @Setup
+    public void setup(ScanBenchmark global) throws IOException
+    {
+      global.appendableIndexSpec = new OnheapIncrementalIndex.Spec();
+
+      executorService = Execs.multiThreaded(numProcessingThreads, "ScanThreadPool");
+
+      qIndexesDir = FileUtils.createTempDir();
+      qIndexes = new ArrayList<>();
+
+      for (int i = 0; i < numSegments; i++) {
+        log.info("Generating rows for segment " + i);
+
+        IncrementalIndex<?> incIndex = global.makeIncIndex();
+        global.generator.reset(RNG_SEED + i).addToIndex(incIndex, global.rowsPerSegment);
+
+        File indexFile = INDEX_MERGER_V9.persist(
+            incIndex,
+            new File(qIndexesDir, String.valueOf(i)),
+            new IndexSpec(),
+            null
+        );
+        incIndex.close();
+
+        qIndexes.add(INDEX_IO.loadIndex(indexFile));
+      }
+    }
+
+    @TearDown
+    public void tearDown()
+    {
+      for (QueryableIndex index : qIndexes) {
+        if (index != null) {
+          index.close();
+        }
+      }
+      if (qIndexesDir != null) {
+        qIndexesDir.delete();
+      }
+    }
   }
 
-  private IncrementalIndex makeIncIndex()
+  private IncrementalIndex<?> makeIncIndex()
   {
-    return new IncrementalIndex.Builder()
+    return appendableIndexSpec.builder()
         .setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
         .setMaxRowCount(rowsPerSegment)
-        .buildOnheap();
+        .build();
   }
 
   private static <T> List<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query)
@@ -340,12 +386,12 @@ public class ScanBenchmark
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
-  public void querySingleIncrementalIndex(Blackhole blackhole)
+  public void querySingleIncrementalIndex(Blackhole blackhole, IncrementalIndexState state)
   {
     QueryRunner<ScanResultValue> runner = QueryBenchmarkUtil.makeQueryRunner(
         factory,
         SegmentId.dummy("incIndex"),
-        new IncrementalIndexSegment(incIndexes.get(0), SegmentId.dummy("incIndex"))
+        new IncrementalIndexSegment(state.incIndex, SegmentId.dummy("incIndex"))
     );
 
     Query effectiveQuery = query
@@ -372,12 +418,12 @@ public class ScanBenchmark
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
-  public void querySingleQueryableIndex(Blackhole blackhole)
+  public void querySingleQueryableIndex(Blackhole blackhole, QueryableIndexState state)
   {
     final QueryRunner<Result<ScanResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
         factory,
         SegmentId.dummy("qIndex"),
-        new QueryableIndexSegment(qIndexes.get(0), SegmentId.dummy("qIndex"))
+        new QueryableIndexSegment(state.qIndexes.get(0), SegmentId.dummy("qIndex"))
     );
 
     Query effectiveQuery = query
@@ -404,17 +450,17 @@ public class ScanBenchmark
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
-  public void queryMultiQueryableIndex(Blackhole blackhole)
+  public void queryMultiQueryableIndex(Blackhole blackhole, QueryableIndexState state)
   {
     List<SegmentDescriptor> segmentDescriptors = new ArrayList<>();
     List<QueryRunner<Row>> runners = new ArrayList<>();
     QueryToolChest toolChest = factory.getToolchest();
-    for (int i = 0; i < numSegments; i++) {
+    for (int i = 0; i < state.numSegments; i++) {
       String segmentName = "qIndex";
       final QueryRunner<Result<ScanResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
           factory,
           SegmentId.dummy(segmentName),
-          new QueryableIndexSegment(qIndexes.get(i), SegmentId.dummy(segmentName, i))
+          new QueryableIndexSegment(state.qIndexes.get(i), SegmentId.dummy(segmentName, i))
       );
       segmentDescriptors.add(
           new SegmentDescriptor(
@@ -428,7 +474,7 @@ public class ScanBenchmark
 
     QueryRunner theRunner = toolChest.postMergeQueryDecoration(
         new FinalizeResultsQueryRunner<>(
-            toolChest.mergeResults(factory.mergeRunners(executorService, runners)),
+            toolChest.mergeResults(factory.mergeRunners(state.executorService, runners)),
             toolChest
         )
     );
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SearchBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SearchBenchmark.java
index 680a179f..2060591 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SearchBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SearchBenchmark.java
@@ -20,12 +20,12 @@
 
 package org.apache.druid.benchmark.query;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.Row;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.FileUtils;
@@ -74,7 +74,10 @@ import org.apache.druid.segment.column.ColumnConfig;
 import org.apache.druid.segment.generator.DataGenerator;
 import org.apache.druid.segment.generator.GeneratorBasicSchemas;
 import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexCreator;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.serde.ComplexMetrics;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.timeline.SegmentId;
@@ -108,9 +111,6 @@ import java.util.concurrent.TimeUnit;
 @Measurement(iterations = 25)
 public class SearchBenchmark
 {
-  @Param({"1"})
-  private int numSegments;
-
   @Param({"750000"})
   private int rowsPerSegment;
 
@@ -121,6 +121,7 @@ public class SearchBenchmark
   private int limit;
 
   private static final Logger log = new Logger(SearchBenchmark.class);
+  private static final int RNG_SEED = 9999;
   private static final IndexMergerV9 INDEX_MERGER_V9;
   private static final IndexIO INDEX_IO;
   public static final ObjectMapper JSON_MAPPER;
@@ -129,16 +130,12 @@ public class SearchBenchmark
     NullHandling.initializeForTests();
   }
 
-  private List<IncrementalIndex> incIndexes;
-  private List<QueryableIndex> qIndexes;
-
+  private AppendableIndexSpec appendableIndexSpec;
+  private DataGenerator generator;
   private QueryRunnerFactory factory;
   private GeneratorSchemaInfo schemaInfo;
   private Druids.SearchQueryBuilder queryBuilder;
   private SearchQuery query;
-  private File tmpDir;
-
-  private ExecutorService executorService;
 
   static {
     JSON_MAPPER = new DefaultObjectMapper();
@@ -312,15 +309,16 @@ public class SearchBenchmark
                  .filters(new AndDimFilter(dimFilters));
   }
 
+  /**
+   * Setup everything common for benchmarking both the incremental-index and the queriable-index.
+   */
   @Setup
-  public void setup() throws IOException
+  public void setup()
   {
     log.info("SETUP CALLED AT " + +System.currentTimeMillis());
 
     ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
 
-    executorService = Execs.multiThreaded(numSegments, "SearchThreadPool");
-
     setupQueries();
 
     String[] schemaQuery = schemaAndQuery.split("\\.");
@@ -332,43 +330,12 @@ public class SearchBenchmark
     queryBuilder.limit(limit);
     query = queryBuilder.build();
 
-    incIndexes = new ArrayList<>();
-    for (int i = 0; i < numSegments; i++) {
-      log.info("Generating rows for segment " + i);
-      DataGenerator gen = new DataGenerator(
-          schemaInfo.getColumnSchemas(),
-          System.currentTimeMillis(),
-          schemaInfo.getDataInterval(),
-          rowsPerSegment
-      );
-
-      IncrementalIndex incIndex = makeIncIndex();
-
-      for (int j = 0; j < rowsPerSegment; j++) {
-        InputRow row = gen.nextRow();
-        if (j % 10000 == 0) {
-          log.info(j + " rows generated.");
-        }
-        incIndex.add(row);
-      }
-      incIndexes.add(incIndex);
-    }
-
-    tmpDir = FileUtils.createTempDir();
-    log.info("Using temp dir: " + tmpDir.getAbsolutePath());
-
-    qIndexes = new ArrayList<>();
-    for (int i = 0; i < numSegments; i++) {
-      File indexFile = INDEX_MERGER_V9.persist(
-          incIndexes.get(i),
-          tmpDir,
-          new IndexSpec(),
-          null
-      );
-
-      QueryableIndex qIndex = INDEX_IO.loadIndex(indexFile);
-      qIndexes.add(qIndex);
-    }
+    generator = new DataGenerator(
+        schemaInfo.getColumnSchemas(),
+        RNG_SEED,
+        schemaInfo.getDataInterval(),
+        rowsPerSegment
+    );
 
     final SearchQueryConfig config = new SearchQueryConfig().withOverrides(query);
     factory = new SearchQueryRunnerFactory(
@@ -378,18 +345,97 @@ public class SearchBenchmark
     );
   }
 
-  @TearDown
-  public void tearDown() throws IOException
+  /**
+   * Setup/teardown everything specific for benchmarking the incremental-index.
+   */
+  @State(Scope.Benchmark)
+  public static class IncrementalIndexState
+  {
+    @Param({"onheap", "offheap"})
+    private String indexType;
+
+    IncrementalIndex<?> incIndex;
+
+    @Setup
+    public void setup(SearchBenchmark global) throws JsonProcessingException
+    {
+      // Creates an AppendableIndexSpec that corresponds to the indexType parametrization.
+      // It is used in {@code global.makeIncIndex()} to instanciate an incremental-index of the specified type.
+      global.appendableIndexSpec = IncrementalIndexCreator.parseIndexType(indexType);
+      incIndex = global.makeIncIndex();
+      global.generator.addToIndex(incIndex, global.rowsPerSegment);
+    }
+
+    @TearDown
+    public void tearDown()
+    {
+      if (incIndex != null) {
+        incIndex.close();
+      }
+    }
+  }
+
+  /**
+   * Setup/teardown everything specific for benchmarking the queriable-index.
+   */
+  @State(Scope.Benchmark)
+  public static class QueryableIndexState
   {
-    FileUtils.deleteDirectory(tmpDir);
+    @Param({"1"})
+    private int numSegments;
+
+    private ExecutorService executorService;
+    private File qIndexesDir;
+    private List<QueryableIndex> qIndexes;
+
+    @Setup
+    public void setup(SearchBenchmark global) throws IOException
+    {
+      global.appendableIndexSpec = new OnheapIncrementalIndex.Spec();
+
+      executorService = Execs.multiThreaded(numSegments, "SearchThreadPool");
+
+      qIndexesDir = FileUtils.createTempDir();
+      qIndexes = new ArrayList<>();
+
+      for (int i = 0; i < numSegments; i++) {
+        log.info("Generating rows for segment " + i);
+
+        IncrementalIndex<?> incIndex = global.makeIncIndex();
+        global.generator.reset(RNG_SEED + i).addToIndex(incIndex, global.rowsPerSegment);
+
+        File indexFile = INDEX_MERGER_V9.persist(
+            incIndex,
+            new File(qIndexesDir, String.valueOf(i)),
+            new IndexSpec(),
+            null
+        );
+        incIndex.close();
+
+        qIndexes.add(INDEX_IO.loadIndex(indexFile));
+      }
+    }
+
+    @TearDown
+    public void tearDown()
+    {
+      for (QueryableIndex index : qIndexes) {
+        if (index != null) {
+          index.close();
+        }
+      }
+      if (qIndexesDir != null) {
+        qIndexesDir.delete();
+      }
+    }
   }
 
-  private IncrementalIndex makeIncIndex()
+  private IncrementalIndex<?> makeIncIndex()
   {
-    return new IncrementalIndex.Builder()
+    return appendableIndexSpec.builder()
         .setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
         .setMaxRowCount(rowsPerSegment)
-        .buildOnheap();
+        .build();
   }
 
   private static <T> List<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query)
@@ -407,12 +453,12 @@ public class SearchBenchmark
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
-  public void querySingleIncrementalIndex(Blackhole blackhole)
+  public void querySingleIncrementalIndex(Blackhole blackhole, IncrementalIndexState state)
   {
     QueryRunner<SearchHit> runner = QueryBenchmarkUtil.makeQueryRunner(
         factory,
         SegmentId.dummy("incIndex"),
-        new IncrementalIndexSegment(incIndexes.get(0), SegmentId.dummy("incIndex"))
+        new IncrementalIndexSegment(state.incIndex, SegmentId.dummy("incIndex"))
     );
 
     List<Result<SearchResultValue>> results = SearchBenchmark.runQuery(factory, runner, query);
@@ -422,12 +468,12 @@ public class SearchBenchmark
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
-  public void querySingleQueryableIndex(Blackhole blackhole)
+  public void querySingleQueryableIndex(Blackhole blackhole, QueryableIndexState state)
   {
     final QueryRunner<Result<SearchResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
         factory,
         SegmentId.dummy("qIndex"),
-        new QueryableIndexSegment(qIndexes.get(0), SegmentId.dummy("qIndex"))
+        new QueryableIndexSegment(state.qIndexes.get(0), SegmentId.dummy("qIndex"))
     );
 
     List<Result<SearchResultValue>> results = SearchBenchmark.runQuery(factory, runner, query);
@@ -438,23 +484,23 @@ public class SearchBenchmark
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
-  public void queryMultiQueryableIndex(Blackhole blackhole)
+  public void queryMultiQueryableIndex(Blackhole blackhole, QueryableIndexState state)
   {
     List<QueryRunner<Row>> singleSegmentRunners = new ArrayList<>();
     QueryToolChest toolChest = factory.getToolchest();
-    for (int i = 0; i < numSegments; i++) {
+    for (int i = 0; i < state.numSegments; i++) {
       String segmentName = "qIndex " + i;
       final QueryRunner<Result<SearchResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
           factory,
           SegmentId.dummy(segmentName),
-          new QueryableIndexSegment(qIndexes.get(i), SegmentId.dummy(segmentName))
+          new QueryableIndexSegment(state.qIndexes.get(i), SegmentId.dummy(segmentName))
       );
       singleSegmentRunners.add(toolChest.preMergeQueryDecoration(runner));
     }
 
     QueryRunner theRunner = toolChest.postMergeQueryDecoration(
         new FinalizeResultsQueryRunner<>(
-            toolChest.mergeResults(factory.mergeRunners(executorService, singleSegmentRunners)),
+            toolChest.mergeResults(factory.mergeRunners(state.executorService, singleSegmentRunners)),
             toolChest
         )
     );
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/TimeseriesBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/TimeseriesBenchmark.java
index 875192a..98b3dd5 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/TimeseriesBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/TimeseriesBenchmark.java
@@ -19,9 +19,9 @@
 
 package org.apache.druid.benchmark.query;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.data.input.InputRow;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.Intervals;
@@ -68,7 +68,10 @@ import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.generator.DataGenerator;
 import org.apache.druid.segment.generator.GeneratorBasicSchemas;
 import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexCreator;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.serde.ComplexMetrics;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.timeline.SegmentId;
@@ -102,15 +105,15 @@ import java.util.concurrent.TimeUnit;
 @Measurement(iterations = 25)
 public class TimeseriesBenchmark
 {
-  @Param({"1"})
-  private int numSegments;
-
   @Param({"750000"})
   private int rowsPerSegment;
 
   @Param({"basic.A", "basic.timeFilterNumeric", "basic.timeFilterAlphanumeric", "basic.timeFilterByInterval"})
   private String schemaAndQuery;
 
+  @Param({"true", "false"})
+  private boolean descending;
+
   private static final Logger log = new Logger(TimeseriesBenchmark.class);
   private static final int RNG_SEED = 9999;
   private static final IndexMergerV9 INDEX_MERGER_V9;
@@ -121,16 +124,12 @@ public class TimeseriesBenchmark
     NullHandling.initializeForTests();
   }
 
-  private List<IncrementalIndex> incIndexes;
-  private List<QueryableIndex> qIndexes;
-  private File tmpDir;
-
+  private AppendableIndexSpec appendableIndexSpec;
+  private DataGenerator generator;
   private QueryRunnerFactory factory;
   private GeneratorSchemaInfo schemaInfo;
   private TimeseriesQuery query;
 
-  private ExecutorService executorService;
-
   static {
     JSON_MAPPER = new DefaultObjectMapper();
     INDEX_IO = new IndexIO(
@@ -171,7 +170,7 @@ public class TimeseriesBenchmark
                 .granularity(Granularities.ALL)
                 .intervals(intervalSpec)
                 .aggregators(queryAggs)
-                .descending(false)
+                .descending(descending)
                 .build();
 
       basicQueries.put("A", queryA);
@@ -191,7 +190,7 @@ public class TimeseriesBenchmark
                 .granularity(Granularities.ALL)
                 .intervals(intervalSpec)
                 .aggregators(queryAggs)
-                .descending(false)
+                .descending(descending)
                 .build();
 
       basicQueries.put("timeFilterNumeric", timeFilterQuery);
@@ -211,7 +210,7 @@ public class TimeseriesBenchmark
                 .granularity(Granularities.ALL)
                 .intervals(intervalSpec)
                 .aggregators(queryAggs)
-                .descending(false)
+                .descending(descending)
                 .build();
 
       basicQueries.put("timeFilterAlphanumeric", timeFilterQuery);
@@ -228,7 +227,7 @@ public class TimeseriesBenchmark
                 .granularity(Granularities.ALL)
                 .intervals(intervalSpec)
                 .aggregators(queryAggs)
-                .descending(false)
+                .descending(descending)
                 .build();
 
       basicQueries.put("timeFilterByInterval", timeFilterQuery);
@@ -238,15 +237,16 @@ public class TimeseriesBenchmark
     SCHEMA_QUERY_MAP.put("basic", basicQueries);
   }
 
+  /**
+   * Setup everything common for benchmarking both the incremental-index and the queriable-index.
+   */
   @Setup
-  public void setup() throws IOException
+  public void setup()
   {
     log.info("SETUP CALLED AT " + System.currentTimeMillis());
 
     ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
 
-    executorService = Execs.multiThreaded(numSegments, "TimeseriesThreadPool");
-
     setupQueries();
 
     String[] schemaQuery = schemaAndQuery.split("\\.");
@@ -256,44 +256,12 @@ public class TimeseriesBenchmark
     schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schemaName);
     query = SCHEMA_QUERY_MAP.get(schemaName).get(queryName);
 
-    incIndexes = new ArrayList<>();
-    for (int i = 0; i < numSegments; i++) {
-      log.info("Generating rows for segment " + i);
-      DataGenerator gen = new DataGenerator(
-          schemaInfo.getColumnSchemas(),
-          RNG_SEED + i,
-          schemaInfo.getDataInterval(),
-          rowsPerSegment
-      );
-
-      IncrementalIndex incIndex = makeIncIndex();
-
-      for (int j = 0; j < rowsPerSegment; j++) {
-        InputRow row = gen.nextRow();
-        if (j % 10000 == 0) {
-          log.info(j + " rows generated.");
-        }
-        incIndex.add(row);
-      }
-      log.info(rowsPerSegment + " rows generated");
-      incIndexes.add(incIndex);
-    }
-
-    tmpDir = FileUtils.createTempDir();
-    log.info("Using temp dir: " + tmpDir.getAbsolutePath());
-
-    qIndexes = new ArrayList<>();
-    for (int i = 0; i < numSegments; i++) {
-      File indexFile = INDEX_MERGER_V9.persist(
-          incIndexes.get(i),
-          tmpDir,
-          new IndexSpec(),
-          null
-      );
-
-      QueryableIndex qIndex = INDEX_IO.loadIndex(indexFile);
-      qIndexes.add(qIndex);
-    }
+    generator = new DataGenerator(
+        schemaInfo.getColumnSchemas(),
+        RNG_SEED,
+        schemaInfo.getDataInterval(),
+        rowsPerSegment
+    );
 
     factory = new TimeseriesQueryRunnerFactory(
         new TimeseriesQueryQueryToolChest(),
@@ -302,18 +270,97 @@ public class TimeseriesBenchmark
     );
   }
 
-  @TearDown
-  public void tearDown() throws IOException
+  /**
+   * Setup/teardown everything specific for benchmarking the incremental-index.
+   */
+  @State(Scope.Benchmark)
+  public static class IncrementalIndexState
   {
-    FileUtils.deleteDirectory(tmpDir);
+    @Param({"onheap", "offheap"})
+    private String indexType;
+
+    IncrementalIndex<?> incIndex;
+
+    @Setup
+    public void setup(TimeseriesBenchmark global) throws JsonProcessingException
+    {
+      // Creates an AppendableIndexSpec that corresponds to the indexType parametrization.
+      // It is used in {@code global.makeIncIndex()} to instanciate an incremental-index of the specified type.
+      global.appendableIndexSpec = IncrementalIndexCreator.parseIndexType(indexType);
+      incIndex = global.makeIncIndex();
+      global.generator.addToIndex(incIndex, global.rowsPerSegment);
+    }
+
+    @TearDown
+    public void tearDown()
+    {
+      if (incIndex != null) {
+        incIndex.close();
+      }
+    }
+  }
+
+  /**
+   * Setup/teardown everything specific for benchmarking the queriable-index.
+   */
+  @State(Scope.Benchmark)
+  public static class QueryableIndexState
+  {
+    @Param({"1"})
+    private int numSegments;
+
+    private ExecutorService executorService;
+    private File qIndexesDir;
+    private List<QueryableIndex> qIndexes;
+
+    @Setup
+    public void setup(TimeseriesBenchmark global) throws IOException
+    {
+      global.appendableIndexSpec = new OnheapIncrementalIndex.Spec();
+
+      executorService = Execs.multiThreaded(numSegments, "TimeseriesThreadPool");
+
+      qIndexesDir = FileUtils.createTempDir();
+      qIndexes = new ArrayList<>();
+
+      for (int i = 0; i < numSegments; i++) {
+        log.info("Generating rows for segment " + i);
+
+        IncrementalIndex<?> incIndex = global.makeIncIndex();
+        global.generator.reset(RNG_SEED + i).addToIndex(incIndex, global.rowsPerSegment);
+
+        File indexFile = INDEX_MERGER_V9.persist(
+            incIndex,
+            new File(qIndexesDir, String.valueOf(i)),
+            new IndexSpec(),
+            null
+        );
+        incIndex.close();
+
+        qIndexes.add(INDEX_IO.loadIndex(indexFile));
+      }
+    }
+
+    @TearDown
+    public void tearDown()
+    {
+      for (QueryableIndex index : qIndexes) {
+        if (index != null) {
+          index.close();
+        }
+      }
+      if (qIndexesDir != null) {
+        qIndexesDir.delete();
+      }
+    }
   }
 
-  private IncrementalIndex makeIncIndex()
+  private IncrementalIndex<?> makeIncIndex()
   {
-    return new IncrementalIndex.Builder()
+    return appendableIndexSpec.builder()
         .setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
         .setMaxRowCount(rowsPerSegment)
-        .buildOnheap();
+        .build();
   }
 
   private static <T> List<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query)
@@ -331,12 +378,12 @@ public class TimeseriesBenchmark
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
-  public void querySingleIncrementalIndex(Blackhole blackhole)
+  public void querySingleIncrementalIndex(Blackhole blackhole, IncrementalIndexState state)
   {
     QueryRunner<Result<TimeseriesResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
         factory,
         SegmentId.dummy("incIndex"),
-        new IncrementalIndexSegment(incIndexes.get(0), SegmentId.dummy("incIndex"))
+        new IncrementalIndexSegment(state.incIndex, SegmentId.dummy("incIndex"))
     );
 
     List<Result<TimeseriesResultValue>> results = TimeseriesBenchmark.runQuery(factory, runner, query);
@@ -346,12 +393,12 @@ public class TimeseriesBenchmark
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
-  public void querySingleQueryableIndex(Blackhole blackhole)
+  public void querySingleQueryableIndex(Blackhole blackhole, QueryableIndexState state)
   {
     final QueryRunner<Result<TimeseriesResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
         factory,
         SegmentId.dummy("qIndex"),
-        new QueryableIndexSegment(qIndexes.get(0), SegmentId.dummy("qIndex"))
+        new QueryableIndexSegment(state.qIndexes.get(0), SegmentId.dummy("qIndex"))
     );
 
     List<Result<TimeseriesResultValue>> results = TimeseriesBenchmark.runQuery(factory, runner, query);
@@ -361,12 +408,12 @@ public class TimeseriesBenchmark
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
-  public void queryFilteredSingleQueryableIndex(Blackhole blackhole)
+  public void queryFilteredSingleQueryableIndex(Blackhole blackhole, QueryableIndexState state)
   {
     final QueryRunner<Result<TimeseriesResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
         factory,
         SegmentId.dummy("qIndex"),
-        new QueryableIndexSegment(qIndexes.get(0), SegmentId.dummy("qIndex"))
+        new QueryableIndexSegment(state.qIndexes.get(0), SegmentId.dummy("qIndex"))
     );
 
     DimFilter filter = new SelectorDimFilter("dimSequential", "399", null);
@@ -379,23 +426,23 @@ public class TimeseriesBenchmark
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
-  public void queryMultiQueryableIndex(Blackhole blackhole)
+  public void queryMultiQueryableIndex(Blackhole blackhole, QueryableIndexState state)
   {
     List<QueryRunner<Result<TimeseriesResultValue>>> singleSegmentRunners = new ArrayList<>();
     QueryToolChest toolChest = factory.getToolchest();
-    for (int i = 0; i < numSegments; i++) {
+    for (int i = 0; i < state.numSegments; i++) {
       SegmentId segmentId = SegmentId.dummy("qIndex " + i);
       QueryRunner<Result<TimeseriesResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
           factory,
           segmentId,
-          new QueryableIndexSegment(qIndexes.get(i), segmentId)
+          new QueryableIndexSegment(state.qIndexes.get(i), segmentId)
       );
       singleSegmentRunners.add(toolChest.preMergeQueryDecoration(runner));
     }
 
     QueryRunner theRunner = toolChest.postMergeQueryDecoration(
         new FinalizeResultsQueryRunner<>(
-            toolChest.mergeResults(factory.mergeRunners(executorService, singleSegmentRunners)),
+            toolChest.mergeResults(factory.mergeRunners(state.executorService, singleSegmentRunners)),
             toolChest
         )
     );
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/TopNBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/TopNBenchmark.java
index b290f30..6587024 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/TopNBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/TopNBenchmark.java
@@ -19,10 +19,10 @@
 
 package org.apache.druid.benchmark.query;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.collections.StupidPool;
 import org.apache.druid.common.config.NullHandling;
-import org.apache.druid.data.input.InputRow;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
@@ -65,7 +65,10 @@ import org.apache.druid.segment.column.ColumnConfig;
 import org.apache.druid.segment.generator.DataGenerator;
 import org.apache.druid.segment.generator.GeneratorBasicSchemas;
 import org.apache.druid.segment.generator.GeneratorSchemaInfo;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexCreator;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.serde.ComplexMetrics;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.timeline.SegmentId;
@@ -99,9 +102,6 @@ import java.util.concurrent.TimeUnit;
 @Measurement(iterations = 25)
 public class TopNBenchmark
 {
-  @Param({"1"})
-  private int numSegments;
-
   @Param({"750000"})
   private int rowsPerSegment;
 
@@ -121,16 +121,12 @@ public class TopNBenchmark
     NullHandling.initializeForTests();
   }
 
-  private List<IncrementalIndex> incIndexes;
-  private List<QueryableIndex> qIndexes;
-
+  private AppendableIndexSpec appendableIndexSpec;
+  private DataGenerator generator;
   private QueryRunnerFactory factory;
   private GeneratorSchemaInfo schemaInfo;
   private TopNQueryBuilder queryBuilder;
   private TopNQuery query;
-  private File tmpDir;
-
-  private ExecutorService executorService;
 
   static {
     JSON_MAPPER = new DefaultObjectMapper();
@@ -212,16 +208,16 @@ public class TopNBenchmark
     SCHEMA_QUERY_MAP.put("basic", basicQueries);
   }
 
-
+  /**
+   * Setup everything common for benchmarking both the incremental-index and the queriable-index.
+   */
   @Setup
-  public void setup() throws IOException
+  public void setup()
   {
     log.info("SETUP CALLED AT " + System.currentTimeMillis());
 
     ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
 
-    executorService = Execs.multiThreaded(numSegments, "TopNThreadPool");
-
     setupQueries();
 
     String[] schemaQuery = schemaAndQuery.split("\\.");
@@ -233,44 +229,12 @@ public class TopNBenchmark
     queryBuilder.threshold(threshold);
     query = queryBuilder.build();
 
-    incIndexes = new ArrayList<>();
-    for (int i = 0; i < numSegments; i++) {
-      log.info("Generating rows for segment " + i);
-
-      DataGenerator gen = new DataGenerator(
-          schemaInfo.getColumnSchemas(),
-          RNG_SEED + i,
-          schemaInfo.getDataInterval(),
-          rowsPerSegment
-      );
-
-      IncrementalIndex incIndex = makeIncIndex();
-
-      for (int j = 0; j < rowsPerSegment; j++) {
-        InputRow row = gen.nextRow();
-        if (j % 10000 == 0) {
-          log.info(j + " rows generated.");
-        }
-        incIndex.add(row);
-      }
-      incIndexes.add(incIndex);
-    }
-
-    tmpDir = FileUtils.createTempDir();
-    log.info("Using temp dir: " + tmpDir.getAbsolutePath());
-
-    qIndexes = new ArrayList<>();
-    for (int i = 0; i < numSegments; i++) {
-      File indexFile = INDEX_MERGER_V9.persist(
-          incIndexes.get(i),
-          tmpDir,
-          new IndexSpec(),
-          null
-      );
-
-      QueryableIndex qIndex = INDEX_IO.loadIndex(indexFile);
-      qIndexes.add(qIndex);
-    }
+    generator = new DataGenerator(
+        schemaInfo.getColumnSchemas(),
+        RNG_SEED,
+        schemaInfo.getDataInterval(),
+        rowsPerSegment
+    );
 
     factory = new TopNQueryRunnerFactory(
         new StupidPool<>(
@@ -284,18 +248,95 @@ public class TopNBenchmark
     );
   }
 
-  @TearDown
-  public void tearDown() throws IOException
+  /**
+   * Setup/teardown everything specific for benchmarking the incremental-index.
+   */
+  @State(Scope.Benchmark)
+  public static class IncrementalIndexState
   {
-    FileUtils.deleteDirectory(tmpDir);
+    @Param({"onheap", "offheap"})
+    private String indexType;
+
+    IncrementalIndex<?> incIndex;
+
+    @Setup
+    public void setup(TopNBenchmark global) throws JsonProcessingException
+    {
+      // Creates an AppendableIndexSpec that corresponds to the indexType parametrization.
+      // It is used in {@code global.makeIncIndex()} to instanciate an incremental-index of the specified type.
+      global.appendableIndexSpec = IncrementalIndexCreator.parseIndexType(indexType);
+      incIndex = global.makeIncIndex();
+      global.generator.addToIndex(incIndex, global.rowsPerSegment);
+    }
+
+    @TearDown
+    public void tearDown()
+    {
+      incIndex.close();
+    }
+  }
+
+  /**
+   * Setup/teardown everything specific for benchmarking the queriable-index.
+   */
+  @State(Scope.Benchmark)
+  public static class QueryableIndexState
+  {
+    @Param({"1"})
+    private int numSegments;
+
+    private ExecutorService executorService;
+    private File qIndexesDir;
+    private List<QueryableIndex> qIndexes;
+
+    @Setup
+    public void setup(TopNBenchmark global) throws IOException
+    {
+      global.appendableIndexSpec = new OnheapIncrementalIndex.Spec();
+
+      executorService = Execs.multiThreaded(numSegments, "TopNThreadPool");
+
+      qIndexesDir = FileUtils.createTempDir();
+      qIndexes = new ArrayList<>();
+
+      for (int i = 0; i < numSegments; i++) {
+        log.info("Generating rows for segment " + i);
+
+        IncrementalIndex<?> incIndex = global.makeIncIndex();
+        global.generator.reset(RNG_SEED + i).addToIndex(incIndex, global.rowsPerSegment);
+
+        File indexFile = INDEX_MERGER_V9.persist(
+            incIndex,
+            new File(qIndexesDir, String.valueOf(i)),
+            new IndexSpec(),
+            null
+        );
+        incIndex.close();
+
+        qIndexes.add(INDEX_IO.loadIndex(indexFile));
+      }
+    }
+
+    @TearDown
+    public void tearDown()
+    {
+      for (QueryableIndex index : qIndexes) {
+        if (index != null) {
+          index.close();
+        }
+      }
+      if (qIndexesDir != null) {
+        qIndexesDir.delete();
+      }
+    }
   }
 
-  private IncrementalIndex makeIncIndex()
+  private IncrementalIndex<?> makeIncIndex()
   {
-    return new IncrementalIndex.Builder()
+    return appendableIndexSpec.builder()
         .setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
         .setMaxRowCount(rowsPerSegment)
-        .buildOnheap();
+        .build();
   }
 
   private static <T> List<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query)
@@ -314,12 +355,12 @@ public class TopNBenchmark
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
-  public void querySingleIncrementalIndex(Blackhole blackhole)
+  public void querySingleIncrementalIndex(Blackhole blackhole, IncrementalIndexState state)
   {
     QueryRunner<Result<TopNResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
         factory,
         SegmentId.dummy("incIndex"),
-        new IncrementalIndexSegment(incIndexes.get(0), SegmentId.dummy("incIndex"))
+        new IncrementalIndexSegment(state.incIndex, SegmentId.dummy("incIndex"))
     );
 
     List<Result<TopNResultValue>> results = TopNBenchmark.runQuery(factory, runner, query);
@@ -329,12 +370,12 @@ public class TopNBenchmark
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
-  public void querySingleQueryableIndex(Blackhole blackhole)
+  public void querySingleQueryableIndex(Blackhole blackhole, QueryableIndexState state)
   {
     final QueryRunner<Result<TopNResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
         factory,
         SegmentId.dummy("qIndex"),
-        new QueryableIndexSegment(qIndexes.get(0), SegmentId.dummy("qIndex"))
+        new QueryableIndexSegment(state.qIndexes.get(0), SegmentId.dummy("qIndex"))
     );
 
     List<Result<TopNResultValue>> results = TopNBenchmark.runQuery(factory, runner, query);
@@ -344,23 +385,23 @@ public class TopNBenchmark
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
-  public void queryMultiQueryableIndex(Blackhole blackhole)
+  public void queryMultiQueryableIndex(Blackhole blackhole, QueryableIndexState state)
   {
     List<QueryRunner<Result<TopNResultValue>>> singleSegmentRunners = new ArrayList<>();
     QueryToolChest toolChest = factory.getToolchest();
-    for (int i = 0; i < numSegments; i++) {
+    for (int i = 0; i < state.numSegments; i++) {
       SegmentId segmentId = SegmentId.dummy("qIndex " + i);
       QueryRunner<Result<TopNResultValue>> runner = QueryBenchmarkUtil.makeQueryRunner(
           factory,
           segmentId,
-          new QueryableIndexSegment(qIndexes.get(i), segmentId)
+          new QueryableIndexSegment(state.qIndexes.get(i), segmentId)
       );
       singleSegmentRunners.add(toolChest.preMergeQueryDecoration(runner));
     }
 
     QueryRunner theRunner = toolChest.postMergeQueryDecoration(
         new FinalizeResultsQueryRunner<>(
-            toolChest.mergeResults(factory.mergeRunners(executorService, singleSegmentRunners)),
+            toolChest.mergeResults(factory.mergeRunners(state.executorService, singleSegmentRunners)),
             toolChest
         )
     );
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/timecompare/TimeCompareBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/timecompare/TimeCompareBenchmark.java
index 406e627..0208fe1 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/timecompare/TimeCompareBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/timecompare/TimeCompareBenchmark.java
@@ -73,6 +73,7 @@ import org.apache.druid.segment.generator.DataGenerator;
 import org.apache.druid.segment.generator.GeneratorBasicSchemas;
 import org.apache.druid.segment.generator.GeneratorSchemaInfo;
 import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.serde.ComplexMetrics;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.timeline.SegmentId;
@@ -404,10 +405,10 @@ public class TimeCompareBenchmark
 
   private IncrementalIndex makeIncIndex()
   {
-    return new IncrementalIndex.Builder()
+    return new OnheapIncrementalIndex.Builder()
         .setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
         .setMaxRowCount(rowsPerSegment)
-        .buildOnheap();
+        .build();
   }
 
   @Benchmark
diff --git a/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java b/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java
index 6c7db8e..3da07ff 100644
--- a/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java
+++ b/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java
@@ -42,6 +42,7 @@ import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.junit.After;
 import org.junit.Before;
@@ -78,7 +79,7 @@ public class DistinctCountGroupByQueryTest extends InitializedNullHandlingTest
   @Test
   public void testGroupByWithDistinctCountAgg() throws Exception
   {
-    IncrementalIndex index = new IncrementalIndex.Builder()
+    IncrementalIndex index = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(
             new IncrementalIndexSchema.Builder()
                 .withQueryGranularity(Granularities.SECOND)
@@ -87,7 +88,7 @@ public class DistinctCountGroupByQueryTest extends InitializedNullHandlingTest
         )
         .setConcurrentEventAdd(true)
         .setMaxRowCount(1000)
-        .buildOnheap();
+        .build();
 
     String visitor_id = "visitor_id";
     String client_type = "client_type";
diff --git a/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountTimeseriesQueryTest.java b/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountTimeseriesQueryTest.java
index 2cc0526..f553bfc 100644
--- a/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountTimeseriesQueryTest.java
+++ b/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountTimeseriesQueryTest.java
@@ -35,6 +35,7 @@ import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.joda.time.DateTime;
 import org.junit.Test;
@@ -50,7 +51,7 @@ public class DistinctCountTimeseriesQueryTest extends InitializedNullHandlingTes
   {
     TimeseriesQueryEngine engine = new TimeseriesQueryEngine();
 
-    IncrementalIndex index = new IncrementalIndex.Builder()
+    IncrementalIndex index = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(
             new IncrementalIndexSchema.Builder()
                 .withQueryGranularity(Granularities.SECOND)
@@ -58,7 +59,7 @@ public class DistinctCountTimeseriesQueryTest extends InitializedNullHandlingTes
                 .build()
         )
         .setMaxRowCount(1000)
-        .buildOnheap();
+        .build();
 
     String visitor_id = "visitor_id";
     String client_type = "client_type";
diff --git a/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java b/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java
index 7b14fba..ef1344c 100644
--- a/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java
+++ b/extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java
@@ -37,6 +37,7 @@ import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.joda.time.DateTime;
 import org.junit.After;
@@ -80,7 +81,7 @@ public class DistinctCountTopNQueryTest extends InitializedNullHandlingTest
   {
     TopNQueryEngine engine = new TopNQueryEngine(pool);
 
-    IncrementalIndex index = new IncrementalIndex.Builder()
+    IncrementalIndex index = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(
             new IncrementalIndexSchema.Builder()
                 .withQueryGranularity(Granularities.SECOND)
@@ -88,7 +89,7 @@ public class DistinctCountTopNQueryTest extends InitializedNullHandlingTest
                 .build()
         )
         .setMaxRowCount(1000)
-        .buildOnheap();
+        .build();
 
     String visitor_id = "visitor_id";
     String client_type = "client_type";
diff --git a/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnTestBase.java b/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnTestBase.java
index 4d2164d..87286b4 100644
--- a/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnTestBase.java
+++ b/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnTestBase.java
@@ -27,6 +27,7 @@ import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 
 import java.io.IOException;
@@ -62,10 +63,10 @@ public class MapVirtualColumnTestBase extends InitializedNullHandlingTest
         .build();
 
     return TestIndex.loadIncrementalIndex(
-        () -> new IncrementalIndex.Builder()
+        () -> new OnheapIncrementalIndex.Builder()
             .setIndexSchema(schema)
             .setMaxRowCount(10000)
-            .buildOnheap(),
+            .build(),
         input,
         parser
     );
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
index 9d30023..2831734 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
@@ -45,6 +45,7 @@ import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.indexing.DataSchema;
 
 import javax.annotation.Nullable;
@@ -230,8 +231,8 @@ public class InputSourceSampler
         .withRollup(dataSchema.getGranularitySpec().isRollup())
         .build();
 
-    return new IncrementalIndex.Builder().setIndexSchema(schema)
+    return new OnheapIncrementalIndex.Builder().setIndexSchema(schema)
                                          .setMaxRowCount(samplerConfig.getNumRows())
-                                         .buildOnheap();
+                                         .build();
   }
 }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
index 6ab4161..01a1d24 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
@@ -73,6 +73,7 @@ import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.loading.LocalDataSegmentPuller;
 import org.apache.druid.segment.loading.LocalLoadSpec;
 import org.apache.druid.segment.realtime.firehose.CombiningFirehoseFactory;
@@ -196,10 +197,10 @@ public class IngestSegmentFirehoseFactoryTest
             new DoubleSumAggregatorFactory(METRIC_FLOAT_NAME, DIM_FLOAT_NAME)
         )
         .build();
-    final IncrementalIndex index = new IncrementalIndex.Builder()
+    final IncrementalIndex index = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(schema)
         .setMaxRowCount(MAX_ROWS * MAX_SHARD_NUMBER)
-        .buildOnheap();
+        .build();
 
     for (Integer i = 0; i < MAX_ROWS; ++i) {
       index.add(ROW_PARSER.parseBatch(buildRow(i.longValue())).get(0));
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
index 06ebc56..400f7da 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
@@ -51,6 +51,7 @@ import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
 import org.apache.druid.segment.transform.TransformSpec;
 import org.apache.druid.timeline.DataSegment;
@@ -250,10 +251,10 @@ public class IngestSegmentFirehoseFactoryTimelineTest
         .withDimensionsSpec(ROW_PARSER)
         .withMetrics(new LongSumAggregatorFactory(METRICS[0], METRICS[0]))
         .build();
-    final IncrementalIndex index = new IncrementalIndex.Builder()
+    final IncrementalIndex index = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(schema)
         .setMaxRowCount(rows.length)
-        .buildOnheap();
+        .build();
 
     for (InputRow row : rows) {
       try {
diff --git a/processing/src/main/java/org/apache/druid/segment/generator/DataGenerator.java b/processing/src/main/java/org/apache/druid/segment/generator/DataGenerator.java
index f2fac21..f9aae18 100644
--- a/processing/src/main/java/org/apache/druid/segment/generator/DataGenerator.java
+++ b/processing/src/main/java/org/apache/druid/segment/generator/DataGenerator.java
@@ -24,17 +24,22 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IndexSizeExceededException;
 import org.joda.time.Interval;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 public class DataGenerator
 {
   private final List<GeneratorColumnSchema> columnSchemas;
-  private final long seed;
 
   private List<ColumnValueGenerator> columnGenerators;
   private final long startTime;
@@ -46,6 +51,8 @@ public class DataGenerator
   private int timeCounter;
   private List<String> dimensionNames;
 
+  private static final Logger log = new Logger(DataGenerator.class);
+
   public DataGenerator(
       List<GeneratorColumnSchema> columnSchemas,
       final long seed,
@@ -55,7 +62,6 @@ public class DataGenerator
   )
   {
     this.columnSchemas = columnSchemas;
-    this.seed = seed;
 
     this.startTime = startTime;
     this.endTime = Long.MAX_VALUE;
@@ -63,7 +69,7 @@ public class DataGenerator
     this.timestampIncrement = timestampIncrement;
     this.currentTime = startTime;
 
-    init();
+    reset(seed);
   }
 
   public DataGenerator(
@@ -74,7 +80,6 @@ public class DataGenerator
   )
   {
     this.columnSchemas = columnSchemas;
-    this.seed = seed;
 
     this.startTime = interval.getStartMillis();
     this.endTime = interval.getEndMillis() - 1;
@@ -85,7 +90,7 @@ public class DataGenerator
     this.timestampIncrement = timeDelta / (numRows * 1.0);
     this.numConsecutiveTimestamps = 0;
 
-    init();
+    reset(seed);
   }
 
   public InputRow nextRow()
@@ -98,7 +103,12 @@ public class DataGenerator
     return row;
   }
 
-  private void init()
+  /**
+   * Reset this generator to start from the begining of the interval with a new seed.
+   *
+   * @param seed the new seed to generate rows from
+   */
+  public DataGenerator reset(long seed)
   {
     this.timeCounter = 0;
     this.currentTime = startTime;
@@ -126,6 +136,8 @@ public class DataGenerator
             }
         )
     );
+
+    return this;
   }
 
   private long nextTimestamp()
@@ -143,4 +155,71 @@ public class DataGenerator
     }
   }
 
+  /**
+   * Initialize a Java Stream generator for InputRow from this DataGenerator.
+   * The generator will log its progress once every 10,000 rows.
+   *
+   * @param numOfRows the number of rows to generate
+   * @return a generator
+   */
+  private Stream<InputRow> generator(int numOfRows)
+  {
+    return Stream.generate(
+        new Supplier<InputRow>()
+        {
+          int i = 0;
+
+          @Override
+          public InputRow get()
+          {
+            InputRow row = DataGenerator.this.nextRow();
+            i++;
+            if (i % 10_000 == 0) {
+              log.info("%,d/%,d rows generated.", i, numOfRows);
+            }
+            return row;
+          }
+        }
+    ).limit(numOfRows);
+  }
+
+  /**
+   * Add rows from any generator to an index.
+   *
+   * @param stream the stream of rows to add
+   * @param index the index to add rows to
+   */
+  public static void addStreamToIndex(Stream<InputRow> stream, IncrementalIndex<?> index)
+  {
+    stream.forEachOrdered(row -> {
+      try {
+        index.add(row);
+      }
+      catch (IndexSizeExceededException e) {
+        throw new RuntimeException(e);
+      }
+    });
+  }
+
+  /**
+   * Add rows from this generator to an index.
+   *
+   * @param index the index to add rows to
+   * @param numOfRows the number of rows to add
+   */
+  public void addToIndex(IncrementalIndex<?> index, int numOfRows)
+  {
+    addStreamToIndex(generator(numOfRows), index);
+  }
+
+  /**
+   * Put rows from this generator to a list.
+   *
+   * @param numOfRows the number of rows to put in the list
+   * @return a List of InputRow
+   */
+  public List<InputRow> toList(int numOfRows)
+  {
+    return generator(numOfRows).collect(Collectors.toList());
+  }
 }
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
index 220f0e3..faf164f 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
@@ -35,7 +35,7 @@ public abstract class AppendableIndexBuilder
   protected int maxRowCount = 0;
   protected long maxBytesInMemory = 0;
 
-  protected final Logger log = new Logger(this.getClass().getName());
+  protected final Logger log = new Logger(this.getClass());
 
   public AppendableIndexBuilder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema)
   {
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
index bbef755..3e3ab54 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
@@ -321,66 +321,6 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
     }
   }
 
-  /**
-   * This class exists only as backward competability to reduce the number of modified lines.
-   */
-  public static class Builder extends OnheapIncrementalIndex.Builder
-  {
-    @Override
-    public Builder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema)
-    {
-      return (Builder) super.setIndexSchema(incrementalIndexSchema);
-    }
-
-    @Override
-    public Builder setSimpleTestingIndexSchema(final AggregatorFactory... metrics)
-    {
-      return (Builder) super.setSimpleTestingIndexSchema(metrics);
-    }
-
-    @Override
-    public Builder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics)
-    {
-      return (Builder) super.setSimpleTestingIndexSchema(rollup, metrics);
-    }
-
-    @Override
-    public Builder setDeserializeComplexMetrics(final boolean deserializeComplexMetrics)
-    {
-      return (Builder) super.setDeserializeComplexMetrics(deserializeComplexMetrics);
-    }
-
-    @Override
-    public Builder setConcurrentEventAdd(final boolean concurrentEventAdd)
-    {
-      return (Builder) super.setConcurrentEventAdd(concurrentEventAdd);
-    }
-
-    @Override
-    public Builder setSortFacts(final boolean sortFacts)
-    {
-      return (Builder) super.setSortFacts(sortFacts);
-    }
-
-    @Override
-    public Builder setMaxRowCount(final int maxRowCount)
-    {
-      return (Builder) super.setMaxRowCount(maxRowCount);
-    }
-
-    @Override
-    public Builder setMaxBytesInMemory(final long maxBytesInMemory)
-    {
-      return (Builder) super.setMaxBytesInMemory(maxBytesInMemory);
-    }
-
-    public OnheapIncrementalIndex buildOnheap()
-    {
-      return (OnheapIncrementalIndex) build();
-    }
-  }
-
-
   public abstract FactsHolder getFacts();
 
   public abstract boolean canAppendRow();
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
index b3cdabc..a74f94f 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
@@ -150,18 +150,13 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
       boolean skipMaxRowsInMemoryCheck // ignored, we always want to check this for offheap
   ) throws IndexSizeExceededException
   {
-    ByteBuffer aggBuffer;
-    int bufferIndex;
-    int bufferOffset;
-
     synchronized (this) {
       final AggregatorFactory[] metrics = getMetrics();
       final int priorIndex = facts.getPriorIndex(key);
       if (IncrementalIndexRow.EMPTY_ROW_INDEX != priorIndex) {
         final int[] indexAndOffset = indexAndOffsets.get(priorIndex);
-        bufferIndex = indexAndOffset[0];
-        bufferOffset = indexAndOffset[1];
-        aggBuffer = aggBuffers.get(bufferIndex).get();
+        ByteBuffer aggBuffer = aggBuffers.get(indexAndOffset[0]).get();
+        return aggregate(row, rowContainer, aggBuffer, indexAndOffset[1]);
       } else {
         if (metrics.length > 0 && getAggs()[0] == null) {
           // note: creation of Aggregators is done lazily when at least one row from input is available
@@ -174,7 +169,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
           rowContainer.set(null);
         }
 
-        bufferIndex = aggBuffers.size() - 1;
+        int bufferIndex = aggBuffers.size() - 1;
         ByteBuffer lastBuffer = aggBuffers.isEmpty() ? null : aggBuffers.get(aggBuffers.size() - 1).get();
         int[] lastAggregatorsIndexAndOffset = indexAndOffsets.isEmpty()
                                               ? null
@@ -184,7 +179,8 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
           throw new ISE("last row's aggregate's buffer and last buffer index must be same");
         }
 
-        bufferOffset = aggsTotalSize + (lastAggregatorsIndexAndOffset != null ? lastAggregatorsIndexAndOffset[1] : 0);
+        int bufferOffset = aggsTotalSize + (lastAggregatorsIndexAndOffset != null ? lastAggregatorsIndexAndOffset[1] : 0);
+        ByteBuffer aggBuffer;
         if (lastBuffer != null &&
             lastBuffer.capacity() - bufferOffset >= aggsTotalSize) {
           aggBuffer = lastBuffer;
@@ -207,8 +203,9 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
 
         final int rowIndex = indexIncrement.getAndIncrement();
 
-        // note that indexAndOffsets must be updated before facts, because as soon as we update facts
-        // concurrent readers get hold of it and might ask for newly added row
+        // note that we must update indexAndOffsets and the aggregator's buffers before facts, because as soon as we
+        // update facts concurrent readers get hold of it and might ask for newly added row
+        AddToFactsResult res = aggregate(row, rowContainer, aggBuffer, bufferOffset);
         indexAndOffsets.add(new int[]{bufferIndex, bufferOffset});
         final int prev = facts.putIfAbsent(key, rowIndex);
         if (IncrementalIndexRow.EMPTY_ROW_INDEX == prev) {
@@ -216,12 +213,22 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
         } else {
           throw new ISE("Unexpected state: Concurrent fact addition.");
         }
+
+        return res;
       }
     }
+  }
 
-    rowContainer.set(row);
-
+  public AddToFactsResult aggregate(
+      InputRow row,
+      ThreadLocal<InputRow> rowContainer,
+      ByteBuffer aggBuffer,
+      int bufferOffset
+  )
+  {
     final List<String> parseExceptionMessages = new ArrayList<>();
+
+    rowContainer.set(row);
     for (int i = 0; i < getMetrics().length; i++) {
       final BufferAggregator agg = getAggs()[i];
 
@@ -237,9 +244,11 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
       }
     }
     rowContainer.set(null);
+
     return new AddToFactsResult(getNumEntries().get(), 0, parseExceptionMessages);
   }
 
+
   @Override
   public int getLastRowIndex()
   {
diff --git a/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java b/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java
index d6d27b8..ee58422 100644
--- a/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java
+++ b/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java
@@ -55,6 +55,7 @@ import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.timeline.SegmentId;
 import org.joda.time.Interval;
@@ -321,10 +322,10 @@ public class DoubleStorageTest
         )
         .build();
 
-    final IncrementalIndex index = new IncrementalIndex.Builder()
+    final IncrementalIndex index = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(schema)
         .setMaxRowCount(MAX_ROWS)
-        .buildOnheap();
+        .build();
 
 
     getStreamOfEvents().forEach(o -> {
diff --git a/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java b/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java
index 33e16db..be6b207 100644
--- a/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java
+++ b/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java
@@ -63,6 +63,7 @@ import org.apache.druid.segment.QueryableIndexSegment;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
@@ -140,10 +141,10 @@ public class MultiValuedDimensionTest extends InitializedNullHandlingTest
   @Before
   public void setup() throws Exception
   {
-    incrementalIndex = new IncrementalIndex.Builder()
+    incrementalIndex = new OnheapIncrementalIndex.Builder()
         .setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
         .setMaxRowCount(5000)
-        .buildOnheap();
+        .build();
 
     StringInputRowParser parser = new StringInputRowParser(
         new CSVParseSpec(
@@ -183,10 +184,10 @@ public class MultiValuedDimensionTest extends InitializedNullHandlingTest
         "UTF-8"
     );
 
-    incrementalIndexNullSampler = new IncrementalIndex.Builder()
+    incrementalIndexNullSampler = new OnheapIncrementalIndex.Builder()
         .setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
         .setMaxRowCount(5000)
-        .buildOnheap();
+        .build();
 
     String[] rowsNullSampler = new String[]{
         "{\"time\":\"2011-01-13T00:00:00.000Z\",\"product\":\"product_1\",\"tags\":[],\"othertags\":[\"u1\", \"u2\"]}",
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
index c599333..4a7df06 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
@@ -82,6 +82,7 @@ import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.column.ColumnConfig;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.timeline.SegmentId;
 import org.junit.rules.TemporaryFolder;
@@ -479,7 +480,7 @@ public class AggregationTestHelper implements Closeable
     List<File> toMerge = new ArrayList<>();
 
     try {
-      index = new IncrementalIndex.Builder()
+      index = new OnheapIncrementalIndex.Builder()
           .setIndexSchema(
               new IncrementalIndexSchema.Builder()
                   .withMinTimestamp(minTimestamp)
@@ -491,7 +492,7 @@ public class AggregationTestHelper implements Closeable
           )
           .setDeserializeComplexMetrics(deserializeComplexMetrics)
           .setMaxRowCount(maxRowCount)
-          .buildOnheap();
+          .build();
 
       while (rows.hasNext()) {
         Object row = rows.next();
@@ -500,7 +501,7 @@ public class AggregationTestHelper implements Closeable
           toMerge.add(tmp);
           indexMerger.persist(index, tmp, new IndexSpec(), null);
           index.close();
-          index = new IncrementalIndex.Builder()
+          index = new OnheapIncrementalIndex.Builder()
               .setIndexSchema(
                   new IncrementalIndexSchema.Builder()
                       .withMinTimestamp(minTimestamp)
@@ -512,7 +513,7 @@ public class AggregationTestHelper implements Closeable
               )
               .setDeserializeComplexMetrics(deserializeComplexMetrics)
               .setMaxRowCount(maxRowCount)
-              .buildOnheap();
+              .build();
         }
         if (row instanceof String && parser instanceof StringInputRowParser) {
           //Note: this is required because StringInputRowParser is InputRowParser<ByteBuffer> as opposed to
@@ -570,7 +571,7 @@ public class AggregationTestHelper implements Closeable
       boolean rollup
   ) throws Exception
   {
-    IncrementalIndex index = new IncrementalIndex.Builder()
+    IncrementalIndex index = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(
             new IncrementalIndexSchema.Builder()
                 .withMinTimestamp(minTimestamp)
@@ -582,7 +583,7 @@ public class AggregationTestHelper implements Closeable
         )
         .setDeserializeComplexMetrics(deserializeComplexMetrics)
         .setMaxRowCount(maxRowCount)
-        .buildOnheap();
+        .build();
 
     while (rows.hasNext()) {
       Object row = rows.next();
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java
index 445c6c9..3f83308 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java
@@ -42,6 +42,7 @@ import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
 import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.serde.ComplexMetrics;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.joda.time.DateTime;
@@ -69,7 +70,7 @@ public class StringFirstTimeseriesQueryTest extends InitializedNullHandlingTest
     final SerializablePairLongStringSerde serde = new SerializablePairLongStringSerde();
     ComplexMetrics.registerSerde(serde.getTypeName(), serde);
 
-    incrementalIndex = new IncrementalIndex.Builder()
+    incrementalIndex = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(
             new IncrementalIndexSchema.Builder()
                 .withQueryGranularity(Granularities.SECOND)
@@ -78,7 +79,7 @@ public class StringFirstTimeseriesQueryTest extends InitializedNullHandlingTest
                 .build()
         )
         .setMaxRowCount(1000)
-        .buildOnheap();
+        .build();
 
     incrementalIndex.add(
         new MapBasedInputRow(
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java
index 7765ec4..ae79cb5 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java
@@ -42,6 +42,7 @@ import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
 import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.serde.ComplexMetrics;
 import org.joda.time.DateTime;
 import org.junit.Before;
@@ -68,7 +69,7 @@ public class StringLastTimeseriesQueryTest
     final SerializablePairLongStringSerde serde = new SerializablePairLongStringSerde();
     ComplexMetrics.registerSerde(serde.getTypeName(), serde);
 
-    incrementalIndex = new IncrementalIndex.Builder()
+    incrementalIndex = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(
             new IncrementalIndexSchema.Builder()
                 .withQueryGranularity(Granularities.SECOND)
@@ -77,7 +78,7 @@ public class StringLastTimeseriesQueryTest
                 .build()
         )
         .setMaxRowCount(1000)
-        .buildOnheap();
+        .build();
 
     incrementalIndex.add(
         new MapBasedInputRow(
diff --git a/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java b/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java
index d22ba99..5ac6b58 100644
--- a/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java
+++ b/processing/src/test/java/org/apache/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java
@@ -41,6 +41,7 @@ import org.apache.druid.query.context.ConcurrentResponseContext;
 import org.apache.druid.query.context.ResponseContext;
 import org.apache.druid.segment.IncrementalIndexSegment;
 import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.timeline.LogicalSegment;
 import org.apache.druid.timeline.SegmentId;
 import org.joda.time.DateTime;
@@ -114,10 +115,10 @@ public class DataSourceMetadataQueryTest
   @Test
   public void testMaxIngestedEventTime() throws Exception
   {
-    final IncrementalIndex rtIndex = new IncrementalIndex.Builder()
+    final IncrementalIndex rtIndex = new OnheapIncrementalIndex.Builder()
         .setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
         .setMaxRowCount(1000)
-        .buildOnheap();
+        .build();
 
     final QueryRunner runner = QueryRunnerTestHelper.makeQueryRunner(
         new DataSourceMetadataQueryRunnerFactory(
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
index a2e994e..f22701d 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
@@ -73,6 +73,7 @@ import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.column.ColumnConfig;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.apache.druid.timeline.SegmentId;
@@ -133,7 +134,7 @@ public class GroupByLimitPushDownInsufficientBufferTest extends InitializedNullH
 
   private IncrementalIndex makeIncIndex(boolean withRollup)
   {
-    return new IncrementalIndex.Builder()
+    return new OnheapIncrementalIndex.Builder()
         .setIndexSchema(
             new IncrementalIndexSchema.Builder()
                 .withDimensionsSpec(new DimensionsSpec(
@@ -149,7 +150,7 @@ public class GroupByLimitPushDownInsufficientBufferTest extends InitializedNullH
         )
         .setConcurrentEventAdd(true)
         .setMaxRowCount(1000)
-        .buildOnheap();
+        .build();
   }
 
   @Before
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
index 357a1e2..3c62e11 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
@@ -79,6 +79,7 @@ import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.timeline.SegmentId;
@@ -140,7 +141,7 @@ public class GroupByLimitPushDownMultiNodeMergeTest
 
   private IncrementalIndex makeIncIndex(boolean withRollup)
   {
-    return new IncrementalIndex.Builder()
+    return new OnheapIncrementalIndex.Builder()
         .setIndexSchema(
             new IncrementalIndexSchema.Builder()
                 .withDimensionsSpec(new DimensionsSpec(
@@ -156,7 +157,7 @@ public class GroupByLimitPushDownMultiNodeMergeTest
         )
         .setConcurrentEventAdd(true)
         .setMaxRowCount(1000)
-        .buildOnheap();
+        .build();
   }
 
   @Before
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
index 4f9b1ff..7b73cc1 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
@@ -70,6 +70,7 @@ import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.column.ColumnConfig;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.timeline.SegmentId;
 import org.junit.After;
@@ -127,7 +128,7 @@ public class GroupByMultiSegmentTest
 
   private IncrementalIndex makeIncIndex(boolean withRollup)
   {
-    return new IncrementalIndex.Builder()
+    return new OnheapIncrementalIndex.Builder()
         .setIndexSchema(
             new IncrementalIndexSchema.Builder()
                 .withDimensionsSpec(new DimensionsSpec(
@@ -143,7 +144,7 @@ public class GroupByMultiSegmentTest
         )
         .setConcurrentEventAdd(true)
         .setMaxRowCount(1000)
-        .buildOnheap();
+        .build();
   }
 
   @Before
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactoryTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactoryTest.java
index 5c62fed..6f84d57 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactoryTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFactoryTest.java
@@ -42,6 +42,7 @@ import org.apache.druid.segment.IncrementalIndexSegment;
 import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.timeline.SegmentId;
 import org.junit.After;
 import org.junit.Before;
@@ -138,11 +139,11 @@ public class GroupByQueryRunnerFactoryTest
 
   private Segment createSegment() throws Exception
   {
-    IncrementalIndex incrementalIndex = new IncrementalIndex.Builder()
+    IncrementalIndex incrementalIndex = new OnheapIncrementalIndex.Builder()
         .setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
         .setConcurrentEventAdd(true)
         .setMaxRowCount(5000)
-        .buildOnheap();
+        .build();
 
     StringInputRowParser parser = new StringInputRowParser(
         new CSVParseSpec(
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
index 9d793c7..8a97e89 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java
@@ -77,6 +77,7 @@ import org.apache.druid.segment.QueryableIndexSegment;
 import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.timeline.SegmentId;
 import org.junit.After;
@@ -125,7 +126,7 @@ public class NestedQueryPushDownTest
 
   private IncrementalIndex makeIncIndex()
   {
-    return new IncrementalIndex.Builder()
+    return new OnheapIncrementalIndex.Builder()
         .setIndexSchema(
             new IncrementalIndexSchema.Builder()
                 .withDimensionsSpec(new DimensionsSpec(
@@ -142,7 +143,7 @@ public class NestedQueryPushDownTest
         )
         .setConcurrentEventAdd(true)
         .setMaxRowCount(1000)
-        .buildOnheap();
+        .build();
   }
 
   @Before
diff --git a/processing/src/test/java/org/apache/druid/query/metadata/SegmentAnalyzerTest.java b/processing/src/test/java/org/apache/druid/query/metadata/SegmentAnalyzerTest.java
index 6f56e0d..11e00f7 100644
--- a/processing/src/test/java/org/apache/druid/query/metadata/SegmentAnalyzerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/metadata/SegmentAnalyzerTest.java
@@ -51,6 +51,7 @@ import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.data.ObjectStrategy;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.serde.ComplexMetricExtractor;
 import org.apache.druid.segment.serde.ComplexMetricSerde;
 import org.apache.druid.segment.serde.ComplexMetrics;
@@ -275,10 +276,10 @@ public class SegmentAnalyzerTest extends InitializedNullHandlingTest
         .withRollup(true)
         .build();
 
-    final IncrementalIndex retVal = new IncrementalIndex.Builder()
+    final IncrementalIndex retVal = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(schema)
         .setMaxRowCount(10000)
-        .buildOnheap();
+        .build();
     IncrementalIndex incrementalIndex = TestIndex.loadIncrementalIndex(retVal, source);
     QueryableIndex queryableIndex = TestIndex.persistRealtimeAndLoadMMapped(incrementalIndex);
     SegmentAnalyzer analyzer = new SegmentAnalyzer(EnumSet.of(SegmentMetadataQuery.AnalysisType.SIZE));
diff --git a/processing/src/test/java/org/apache/druid/query/scan/MultiSegmentScanQueryTest.java b/processing/src/test/java/org/apache/druid/query/scan/MultiSegmentScanQueryTest.java
index a579df0..93f46d3 100644
--- a/processing/src/test/java/org/apache/druid/query/scan/MultiSegmentScanQueryTest.java
+++ b/processing/src/test/java/org/apache/druid/query/scan/MultiSegmentScanQueryTest.java
@@ -46,6 +46,7 @@ import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.TestIndex;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.partition.NoneShardSpec;
 import org.joda.time.Interval;
@@ -151,10 +152,10 @@ public class MultiSegmentScanQueryTest extends NullHandlingTest
         .withQueryGranularity(Granularities.HOUR)
         .withMetrics(TestIndex.METRIC_AGGS)
         .build();
-    return new IncrementalIndex.Builder()
+    return new OnheapIncrementalIndex.Builder()
         .setIndexSchema(schema)
         .setMaxRowCount(maxRowCount)
-        .buildOnheap();
+        .build();
   }
 
   @AfterClass
diff --git a/processing/src/test/java/org/apache/druid/query/search/SearchQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/search/SearchQueryRunnerTest.java
index aebcf25..16a3590 100644
--- a/processing/src/test/java/org/apache/druid/query/search/SearchQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/search/SearchQueryRunnerTest.java
@@ -59,6 +59,7 @@ import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.apache.druid.timeline.SegmentId;
 import org.junit.Assert;
@@ -719,14 +720,14 @@ public class SearchQueryRunnerTest extends InitializedNullHandlingTest
   @Test
   public void testSearchWithNullValueInDimension() throws Exception
   {
-    IncrementalIndex<Aggregator> index = new IncrementalIndex.Builder()
+    IncrementalIndex<Aggregator> index = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(
             new IncrementalIndexSchema.Builder()
                 .withMinTimestamp(DateTimes.of("2011-01-12T00:00:00.000Z").getMillis())
                 .build()
         )
         .setMaxRowCount(10)
-        .buildOnheap();
+        .build();
 
     index.add(
         new MapBasedInputRow(
diff --git a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java
index ecbe8af..6834cec 100644
--- a/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java
@@ -41,6 +41,7 @@ import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.TestIndex;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.apache.druid.timeline.partition.NoneShardSpec;
@@ -117,10 +118,10 @@ public class TimeBoundaryQueryRunnerTest
         .withQueryGranularity(Granularities.HOUR)
         .withMetrics(TestIndex.METRIC_AGGS)
         .build();
-    return new IncrementalIndex.Builder()
+    return new OnheapIncrementalIndex.Builder()
         .setIndexSchema(schema)
         .setMaxRowCount(maxRowCount)
-        .buildOnheap();
+        .build();
   }
 
   private static SegmentId makeIdentifier(IncrementalIndex index, String version)
diff --git a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java
index a0d33e1..8413c7e 100644
--- a/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java
+++ b/processing/src/test/java/org/apache/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java
@@ -38,6 +38,7 @@ import org.apache.druid.segment.IncrementalIndexSegment;
 import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.timeline.SegmentId;
 import org.junit.Assert;
 import org.junit.Test;
@@ -66,14 +67,14 @@ public class TimeseriesQueryRunnerBonusTest
   @Test
   public void testOneRowAtATime() throws Exception
   {
-    final IncrementalIndex oneRowIndex = new IncrementalIndex.Builder()
+    final IncrementalIndex oneRowIndex = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(
             new IncrementalIndexSchema.Builder()
                 .withMinTimestamp(DateTimes.of("2012-01-01T00:00:00Z").getMillis())
                 .build()
         )
         .setMaxRowCount(1000)
-        .buildOnheap();
+        .build();
 
     List<Result<TimeseriesResultValue>> results;
 
diff --git a/processing/src/test/java/org/apache/druid/segment/EmptyIndexTest.java b/processing/src/test/java/org/apache/druid/segment/EmptyIndexTest.java
index caeab96..8264460 100644
--- a/processing/src/test/java/org/apache/druid/segment/EmptyIndexTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/EmptyIndexTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexAdapter;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
@@ -72,10 +73,10 @@ public class EmptyIndexTest
     }
 
     try {
-      IncrementalIndex emptyIndex = new IncrementalIndex.Builder()
+      IncrementalIndex emptyIndex = new OnheapIncrementalIndex.Builder()
           .setSimpleTestingIndexSchema(/* empty */)
           .setMaxRowCount(1000)
-          .buildOnheap();
+          .build();
 
       IncrementalIndexAdapter emptyIndexAdapter = new IncrementalIndexAdapter(
           Intervals.of("2012-08-01/P3D"),
diff --git a/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java b/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java
index 2ed65fd..c5d0b3a 100644
--- a/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java
+++ b/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java
@@ -32,6 +32,7 @@ import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.apache.druid.timeline.SegmentId;
@@ -226,10 +227,10 @@ public class IndexBuilder
   )
   {
     Preconditions.checkNotNull(schema, "schema");
-    final IncrementalIndex incrementalIndex = new IncrementalIndex.Builder()
+    final IncrementalIndex incrementalIndex = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(schema)
         .setMaxRowCount(maxRows)
-        .buildOnheap();
+        .build();
 
     for (InputRow row : rows) {
       try {
diff --git a/processing/src/test/java/org/apache/druid/segment/IndexIOTest.java b/processing/src/test/java/org/apache/druid/segment/IndexIOTest.java
index 2fb6bc3..5066bef 100644
--- a/processing/src/test/java/org/apache/druid/segment/IndexIOTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/IndexIOTest.java
@@ -40,6 +40,7 @@ import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexAdapter;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.joda.time.Interval;
 import org.junit.Assert;
@@ -253,7 +254,7 @@ public class IndexIOTest extends InitializedNullHandlingTest
     this.exception = exception;
   }
 
-  final IncrementalIndex<Aggregator> incrementalIndex1 = new IncrementalIndex.Builder()
+  final IncrementalIndex<Aggregator> incrementalIndex1 = new OnheapIncrementalIndex.Builder()
       .setIndexSchema(
           new IncrementalIndexSchema.Builder()
               .withMinTimestamp(DEFAULT_INTERVAL.getStart().getMillis())
@@ -268,9 +269,9 @@ public class IndexIOTest extends InitializedNullHandlingTest
               .build()
       )
       .setMaxRowCount(1000000)
-      .buildOnheap();
+      .build();
 
-  final IncrementalIndex<Aggregator> incrementalIndex2 = new IncrementalIndex.Builder()
+  final IncrementalIndex<Aggregator> incrementalIndex2 = new OnheapIncrementalIndex.Builder()
       .setIndexSchema(
           new IncrementalIndexSchema.Builder()
               .withMinTimestamp(DEFAULT_INTERVAL.getStart().getMillis())
@@ -285,7 +286,7 @@ public class IndexIOTest extends InitializedNullHandlingTest
               .build()
       )
       .setMaxRowCount(1000000)
-      .buildOnheap();
+      .build();
 
   IndexableAdapter adapter1;
   IndexableAdapter adapter2;
diff --git a/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java b/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
index b16429f..124554e 100644
--- a/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
+++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
@@ -57,6 +57,7 @@ import org.apache.druid.segment.data.IncrementalIndexTest;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexAdapter;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.joda.time.Interval;
@@ -270,10 +271,10 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
     IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null);
     IncrementalIndexTest.populateIndex(timestamp, toPersist1);
 
-    IncrementalIndex toPersist2 = new IncrementalIndex.Builder()
+    IncrementalIndex toPersist2 = new OnheapIncrementalIndex.Builder()
         .setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
         .setMaxRowCount(1000)
-        .buildOnheap();
+        .build();
 
     toPersist2.add(
         new MapBasedInputRow(
@@ -344,15 +345,15 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
   @Test
   public void testPersistEmptyColumn() throws Exception
   {
-    final IncrementalIndex toPersist1 = new IncrementalIndex.Builder()
+    final IncrementalIndex toPersist1 = new OnheapIncrementalIndex.Builder()
         .setSimpleTestingIndexSchema(/* empty */)
         .setMaxRowCount(10)
-        .buildOnheap();
+        .build();
 
-    final IncrementalIndex toPersist2 = new IncrementalIndex.Builder()
+    final IncrementalIndex toPersist2 = new OnheapIncrementalIndex.Builder()
         .setSimpleTestingIndexSchema(/* empty */)
         .setMaxRowCount(10)
-        .buildOnheap();
+        .build();
 
     final File tmpDir1 = temporaryFolder.newFolder();
     final File tmpDir2 = temporaryFolder.newFolder();
@@ -840,18 +841,18 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
         .build();
 
 
-    IncrementalIndex toPersist1 = new IncrementalIndex.Builder()
+    IncrementalIndex toPersist1 = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(schema)
         .setMaxRowCount(1000)
-        .buildOnheap();
-    IncrementalIndex toPersist2 = new IncrementalIndex.Builder()
+        .build();
+    IncrementalIndex toPersist2 = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(schema)
         .setMaxRowCount(1000)
-        .buildOnheap();
-    IncrementalIndex toPersist3 = new IncrementalIndex.Builder()
+        .build();
+    IncrementalIndex toPersist3 = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(schema)
         .setMaxRowCount(1000)
-        .buildOnheap();
+        .build();
 
     addDimValuesToIndex(toPersist1, "dimA", Arrays.asList("1", "2"));
     addDimValuesToIndex(toPersist2, "dimA", Arrays.asList("1", "2"));
@@ -1027,10 +1028,10 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
 
     for (IncrementalIndexSchema indexSchema : Arrays.asList(rollupIndexSchema, noRollupIndexSchema)) {
 
-      IncrementalIndex toPersistA = new IncrementalIndex.Builder()
+      IncrementalIndex toPersistA = new OnheapIncrementalIndex.Builder()
           .setIndexSchema(indexSchema)
           .setMaxRowCount(1000)
-          .buildOnheap();
+          .build();
 
       toPersistA.add(
           new MapBasedInputRow(
@@ -1047,10 +1048,10 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
           )
       );
 
-      IncrementalIndex toPersistB = new IncrementalIndex.Builder()
+      IncrementalIndex toPersistB = new OnheapIncrementalIndex.Builder()
           .setIndexSchema(indexSchema)
           .setMaxRowCount(1000)
-          .buildOnheap();
+          .build();
 
       toPersistB.add(
           new MapBasedInputRow(
@@ -1193,10 +1194,10 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
         .withMetrics(new CountAggregatorFactory("count"))
         .withRollup(false)
         .build();
-    IncrementalIndex toPersistA = new IncrementalIndex.Builder()
+    IncrementalIndex toPersistA = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(indexSchema)
         .setMaxRowCount(1000)
-        .buildOnheap();
+        .build();
 
     toPersistA.add(
         new MapBasedInputRow(
@@ -1217,10 +1218,10 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
         )
     );
 
-    IncrementalIndex toPersistB = new IncrementalIndex.Builder()
+    IncrementalIndex toPersistB = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(indexSchema)
         .setMaxRowCount(1000)
-        .buildOnheap();
+        .build();
 
     toPersistB.add(
         new MapBasedInputRow(
@@ -1330,10 +1331,10 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
     IncrementalIndex toPersistBA = getSingleDimIndex("dimB", Arrays.asList("1", "2", "3"));
     addDimValuesToIndex(toPersistBA, "dimA", Arrays.asList("1", "2"));
 
-    IncrementalIndex toPersistBA2 = new IncrementalIndex.Builder()
+    IncrementalIndex toPersistBA2 = new OnheapIncrementalIndex.Builder()
         .setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
         .setMaxRowCount(1000)
-        .buildOnheap();
+        .build();
 
     toPersistBA2.add(
         new MapBasedInputRow(
@@ -1885,10 +1886,10 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
         .withMetrics(new CountAggregatorFactory("count"))
         .build();
 
-    return new IncrementalIndex.Builder()
+    return new OnheapIncrementalIndex.Builder()
         .setIndexSchema(schema)
         .setMaxRowCount(1000)
-        .buildOnheap();
+        .build();
   }
 
 
@@ -1932,10 +1933,10 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
 
   private IncrementalIndex getIndexD3() throws Exception
   {
-    IncrementalIndex toPersist1 = new IncrementalIndex.Builder()
+    IncrementalIndex toPersist1 = new OnheapIncrementalIndex.Builder()
         .setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
         .setMaxRowCount(1000)
-        .buildOnheap();
+        .build();
 
     toPersist1.add(
         new MapBasedInputRow(
@@ -1966,10 +1967,10 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
 
   private IncrementalIndex getSingleDimIndex(String dimName, List<String> values) throws Exception
   {
-    IncrementalIndex toPersist1 = new IncrementalIndex.Builder()
+    IncrementalIndex toPersist1 = new OnheapIncrementalIndex.Builder()
         .setSimpleTestingIndexSchema(new CountAggregatorFactory("count"))
         .setMaxRowCount(1000)
-        .buildOnheap();
+        .build();
 
     addDimValuesToIndex(toPersist1, dimName, values);
     return toPersist1;
@@ -1989,10 +1990,10 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
         .withMetrics(new CountAggregatorFactory("count"))
         .build();
 
-    return new IncrementalIndex.Builder()
+    return new OnheapIncrementalIndex.Builder()
         .setIndexSchema(schema)
         .setMaxRowCount(1000)
-        .buildOnheap();
+        .build();
   }
 
   private AggregatorFactory[] getCombiningAggregators(AggregatorFactory[] aggregators)
@@ -2225,10 +2226,10 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
         .withRollup(true)
         .build();
 
-    IncrementalIndex toPersistA = new IncrementalIndex.Builder()
+    IncrementalIndex toPersistA = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(indexSchema)
         .setMaxRowCount(1000)
-        .buildOnheap();
+        .build();
 
     Map<String, Object> event1 = new HashMap<>();
     event1.put("dimA", "leek");
@@ -2243,10 +2244,10 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
     toPersistA.add(new MapBasedInputRow(1, dims, event1));
     toPersistA.add(new MapBasedInputRow(1, dims, event2));
 
-    IncrementalIndex toPersistB = new IncrementalIndex.Builder()
+    IncrementalIndex toPersistB = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(indexSchema)
         .setMaxRowCount(1000)
-        .buildOnheap();
+        .build();
 
     Map<String, Object> event3 = new HashMap<>();
     event3.put("dimA", "leek");
@@ -2472,10 +2473,10 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
         multivalEvent9
     );
 
-    IncrementalIndex toPersistA = new IncrementalIndex.Builder()
+    IncrementalIndex toPersistA = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(indexSchema)
         .setMaxRowCount(1000)
-        .buildOnheap();
+        .build();
 
     for (Map<String, Object> event : events) {
       toPersistA.add(new MapBasedInputRow(1, dims, event));
@@ -2488,10 +2489,10 @@ public class IndexMergerTestBase extends InitializedNullHandlingTest
 
     List<QueryableIndex> singleEventIndexes = new ArrayList<>();
     for (Map<String, Object> event : events) {
-      IncrementalIndex toPersist = new IncrementalIndex.Builder()
+      IncrementalIndex toPersist = new OnheapIncrementalIndex.Builder()
           .setIndexSchema(indexSchema)
           .setMaxRowCount(1000)
-          .buildOnheap();
+          .build();
 
       toPersist.add(new MapBasedInputRow(1, dims, event));
       final File tmpDir = temporaryFolder.newFolder();
diff --git a/processing/src/test/java/org/apache/druid/segment/IndexMergerV9CompatibilityTest.java b/processing/src/test/java/org/apache/druid/segment/IndexMergerV9CompatibilityTest.java
index 4baaddf..b1866ff 100644
--- a/processing/src/test/java/org/apache/druid/segment/IndexMergerV9CompatibilityTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerV9CompatibilityTest.java
@@ -35,6 +35,7 @@ import org.apache.druid.segment.data.CompressionStrategy;
 import org.apache.druid.segment.data.ConciseBitmapSerdeFactory;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
@@ -133,7 +134,7 @@ public class IndexMergerV9CompatibilityTest
   @Before
   public void setUp() throws IOException
   {
-    toPersist = new IncrementalIndex.Builder()
+    toPersist = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(
             new IncrementalIndexSchema.Builder()
                 .withMinTimestamp(JodaUtils.MIN_INSTANT)
@@ -141,7 +142,7 @@ public class IndexMergerV9CompatibilityTest
                 .build()
         )
     .setMaxRowCount(1000000)
-    .buildOnheap();
+    .build();
 
     toPersist.getMetadata().put("key", "value");
     for (InputRow event : events) {
diff --git a/processing/src/test/java/org/apache/druid/segment/IndexMergerV9WithSpatialIndexTest.java b/processing/src/test/java/org/apache/druid/segment/IndexMergerV9WithSpatialIndexTest.java
index 7a0f8e7..8e5da94 100644
--- a/processing/src/test/java/org/apache/druid/segment/IndexMergerV9WithSpatialIndexTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerV9WithSpatialIndexTest.java
@@ -47,6 +47,7 @@ import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
 import org.apache.druid.query.timeseries.TimeseriesResultValue;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.joda.time.Interval;
@@ -101,7 +102,7 @@ public class IndexMergerV9WithSpatialIndexTest extends InitializedNullHandlingTe
 
   private static IncrementalIndex makeIncrementalIndex() throws IOException
   {
-    IncrementalIndex theIndex = new IncrementalIndex.Builder()
+    IncrementalIndex theIndex = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(
             new IncrementalIndexSchema.Builder()
                 .withMinTimestamp(DATA_INTERVAL.getStartMillis())
@@ -126,7 +127,7 @@ public class IndexMergerV9WithSpatialIndexTest extends InitializedNullHandlingTe
                 ).build()
         )
         .setMaxRowCount(NUM_POINTS)
-        .buildOnheap();
+        .build();
 
     theIndex.add(
         new MapBasedInputRow(
@@ -276,7 +277,7 @@ public class IndexMergerV9WithSpatialIndexTest extends InitializedNullHandlingTe
   )
   {
     try {
-      IncrementalIndex first = new IncrementalIndex.Builder()
+      IncrementalIndex first = new OnheapIncrementalIndex.Builder()
           .setIndexSchema(
               new IncrementalIndexSchema.Builder()
                   .withMinTimestamp(DATA_INTERVAL.getStartMillis())
@@ -301,9 +302,9 @@ public class IndexMergerV9WithSpatialIndexTest extends InitializedNullHandlingTe
                   ).build()
           )
           .setMaxRowCount(1000)
-          .buildOnheap();
+          .build();
 
-      IncrementalIndex second = new IncrementalIndex.Builder()
+      IncrementalIndex second = new OnheapIncrementalIndex.Builder()
           .setIndexSchema(
               new IncrementalIndexSchema.Builder()
                   .withMinTimestamp(DATA_INTERVAL.getStartMillis())
@@ -328,9 +329,9 @@ public class IndexMergerV9WithSpatialIndexTest extends InitializedNullHandlingTe
                   ).build()
           )
           .setMaxRowCount(1000)
-          .buildOnheap();
+          .build();
 
-      IncrementalIndex third = new IncrementalIndex.Builder()
+      IncrementalIndex third = new OnheapIncrementalIndex.Builder()
           .setIndexSchema(
               new IncrementalIndexSchema.Builder()
                   .withMinTimestamp(DATA_INTERVAL.getStartMillis())
@@ -355,7 +356,7 @@ public class IndexMergerV9WithSpatialIndexTest extends InitializedNullHandlingTe
                   ).build()
           )
           .setMaxRowCount(NUM_POINTS)
-          .buildOnheap();
+          .build();
 
       first.add(
           new MapBasedInputRow(
diff --git a/processing/src/test/java/org/apache/druid/segment/SchemalessIndexTest.java b/processing/src/test/java/org/apache/druid/segment/SchemalessIndexTest.java
index 6acd72a..7328061 100644
--- a/processing/src/test/java/org/apache/druid/segment/SchemalessIndexTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/SchemalessIndexTest.java
@@ -39,6 +39,7 @@ import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.serde.ComplexMetrics;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.apache.druid.timeline.Overshadowable;
@@ -149,7 +150,7 @@ public class SchemalessIndexTest
         final long timestamp = new DateTime(event.get(TIMESTAMP), ISOChronology.getInstanceUTC()).getMillis();
 
         if (theIndex == null) {
-          theIndex = new IncrementalIndex.Builder()
+          theIndex = new OnheapIncrementalIndex.Builder()
               .setIndexSchema(
                   new IncrementalIndexSchema.Builder()
                       .withMinTimestamp(timestamp)
@@ -158,7 +159,7 @@ public class SchemalessIndexTest
                       .build()
               )
               .setMaxRowCount(1000)
-              .buildOnheap();
+              .build();
         }
 
         final List<String> dims = new ArrayList<>();
@@ -369,7 +370,7 @@ public class SchemalessIndexTest
             }
           }
 
-          final IncrementalIndex rowIndex = new IncrementalIndex.Builder()
+          final IncrementalIndex rowIndex = new OnheapIncrementalIndex.Builder()
               .setIndexSchema(
                   new IncrementalIndexSchema.Builder()
                       .withMinTimestamp(timestamp)
@@ -378,7 +379,7 @@ public class SchemalessIndexTest
                       .build()
               )
               .setMaxRowCount(1000)
-              .buildOnheap();
+              .build();
 
           rowIndex.add(
               new MapBasedInputRow(timestamp, dims, event)
@@ -406,7 +407,7 @@ public class SchemalessIndexTest
     String filename = resource.getFile();
     log.info("Realtime loading index file[%s]", filename);
 
-    final IncrementalIndex retVal = new IncrementalIndex.Builder()
+    final IncrementalIndex retVal = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(
             new IncrementalIndexSchema.Builder()
                 .withMinTimestamp(DateTimes.of("2011-01-12T00:00:00.000Z").getMillis())
@@ -415,7 +416,7 @@ public class SchemalessIndexTest
                 .build()
         )
         .setMaxRowCount(1000)
-        .buildOnheap();
+        .build();
 
     try {
       final List<Object> events = JSON_MAPPER.readValue(new File(filename), List.class);
diff --git a/processing/src/test/java/org/apache/druid/segment/TestIndex.java b/processing/src/test/java/org/apache/druid/segment/TestIndex.java
index 58bf56c..020a4ec 100644
--- a/processing/src/test/java/org/apache/druid/segment/TestIndex.java
+++ b/processing/src/test/java/org/apache/druid/segment/TestIndex.java
@@ -49,6 +49,7 @@ import org.apache.druid.query.expression.TestExprMacroTable;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.serde.ComplexMetrics;
 import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
@@ -293,10 +294,10 @@ public class TestIndex
         .withMetrics(METRIC_AGGS)
         .withRollup(rollup)
         .build();
-    final IncrementalIndex retVal = new IncrementalIndex.Builder()
+    final IncrementalIndex retVal = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(schema)
         .setMaxRowCount(10000)
-        .buildOnheap();
+        .build();
 
     try {
       return loadIncrementalIndex(retVal, source);
diff --git a/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
index 534034b..9b5d2f3 100644
--- a/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.segment.data;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
@@ -28,7 +29,6 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.druid.collections.CloseableStupidPool;
 import org.apache.druid.data.input.MapBasedInputRow;
 import org.apache.druid.data.input.Row;
 import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -37,7 +37,6 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.guava.Accumulator;
 import org.apache.druid.java.util.common.guava.Sequence;
-import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.FinalizeResultsQueryRunner;
 import org.apache.druid.query.QueryPlus;
@@ -45,7 +44,6 @@ import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryRunnerFactory;
 import org.apache.druid.query.QueryRunnerTestHelper;
 import org.apache.druid.query.Result;
-import org.apache.druid.query.aggregation.Aggregator;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
@@ -63,21 +61,18 @@ import org.apache.druid.segment.CloserRule;
 import org.apache.druid.segment.IncrementalIndexSegment;
 import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexCreator;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.incremental.IndexSizeExceededException;
-import org.apache.druid.segment.incremental.OffheapIncrementalIndex;
 import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.joda.time.Interval;
-import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -96,70 +91,24 @@ import java.util.concurrent.atomic.AtomicInteger;
 @RunWith(Parameterized.class)
 public class IncrementalIndexTest extends InitializedNullHandlingTest
 {
-  interface IndexCreator
-  {
-    IncrementalIndex createIndex(AggregatorFactory[] aggregatorFactories);
-  }
-
-  private static final Closer RESOURCE_CLOSER = Closer.create();
-
-  @AfterClass
-  public static void teardown() throws IOException
-  {
-    RESOURCE_CLOSER.close();
-  }
-
-  private final IndexCreator indexCreator;
+  public final IncrementalIndexCreator indexCreator;
 
   @Rule
-  public final CloserRule closerRule = new CloserRule(false);
+  public final CloserRule closer = new CloserRule(false);
 
-  public IncrementalIndexTest(IndexCreator indexCreator)
+  public IncrementalIndexTest(String indexType, String mode) throws JsonProcessingException
   {
-    this.indexCreator = indexCreator;
+    indexCreator = closer.closeLater(new IncrementalIndexCreator(indexType, (builder, args) -> builder
+        .setSimpleTestingIndexSchema("rollup".equals(mode), (AggregatorFactory[]) args[0])
+        .setMaxRowCount(1_000_000)
+        .build()
+    ));
   }
 
-  @Parameterized.Parameters
+  @Parameterized.Parameters(name = "{index}: {0}, {1}")
   public static Collection<?> constructorFeeder()
   {
-    final List<Object[]> params = new ArrayList<>();
-    params.add(new Object[] {(IndexCreator) IncrementalIndexTest::createIndex});
-    final CloseableStupidPool<ByteBuffer> pool1 = new CloseableStupidPool<>(
-        "OffheapIncrementalIndex-bufferPool",
-        () -> ByteBuffer.allocate(256 * 1024)
-    );
-    RESOURCE_CLOSER.register(pool1);
-    params.add(
-        new Object[] {
-            (IndexCreator) factories -> new OffheapIncrementalIndex.Builder()
-                .setBufferPool(pool1)
-                .setSimpleTestingIndexSchema(factories)
-                .setMaxRowCount(1000000)
-                .build()
-        }
-    );
-    params.add(new Object[] {(IndexCreator) IncrementalIndexTest::createNoRollupIndex});
-    final CloseableStupidPool<ByteBuffer> pool2 = new CloseableStupidPool<>(
-        "OffheapIncrementalIndex-bufferPool",
-        () -> ByteBuffer.allocate(256 * 1024)
-    );
-    RESOURCE_CLOSER.register(pool2);
-    params.add(
-        new Object[] {
-            (IndexCreator) factories -> new OffheapIncrementalIndex.Builder()
-                .setBufferPool(pool2)
-                .setIndexSchema(
-                    new IncrementalIndexSchema.Builder()
-                        .withMetrics(factories)
-                        .withRollup(false)
-                        .build()
-                )
-                .setMaxRowCount(1000000)
-                .build()
-        }
-    );
-
-    return params;
+    return IncrementalIndexCreator.indexTypeCartesianProduct(ImmutableList.of("rollup", "plain"));
   }
 
   public static AggregatorFactory[] getDefaultCombiningAggregatorFactories()
@@ -268,7 +217,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
   public void testCaseSensitivity() throws Exception
   {
     long timestamp = System.currentTimeMillis();
-    IncrementalIndex index = closerRule.closeLater(indexCreator.createIndex(DEFAULT_AGGREGATOR_FACTORIES));
+    IncrementalIndex<?> index = indexCreator.createIndex((Object) DEFAULT_AGGREGATOR_FACTORIES);
 
     populateIndex(timestamp, index);
     Assert.assertEquals(Arrays.asList("dim1", "dim2"), index.getDimensionNames());
@@ -290,27 +239,25 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
   public void testFilteredAggregators() throws Exception
   {
     long timestamp = System.currentTimeMillis();
-    IncrementalIndex index = closerRule.closeLater(
-        indexCreator.createIndex(new AggregatorFactory[]{
-            new CountAggregatorFactory("count"),
-            new FilteredAggregatorFactory(
-                new CountAggregatorFactory("count_selector_filtered"),
-                new SelectorDimFilter("dim2", "2", null)
-            ),
-            new FilteredAggregatorFactory(
-                new CountAggregatorFactory("count_bound_filtered"),
-                new BoundDimFilter("dim2", "2", "3", false, true, null, null, StringComparators.NUMERIC)
-            ),
-            new FilteredAggregatorFactory(
-                new CountAggregatorFactory("count_multivaldim_filtered"),
-                new SelectorDimFilter("dim3", "b", null)
-            ),
-            new FilteredAggregatorFactory(
-                new CountAggregatorFactory("count_numeric_filtered"),
-                new SelectorDimFilter("met1", "11", null)
-            )
-        })
-    );
+    IncrementalIndex<?> index = indexCreator.createIndex((Object) new AggregatorFactory[]{
+        new CountAggregatorFactory("count"),
+        new FilteredAggregatorFactory(
+            new CountAggregatorFactory("count_selector_filtered"),
+            new SelectorDimFilter("dim2", "2", null)
+        ),
+        new FilteredAggregatorFactory(
+            new CountAggregatorFactory("count_bound_filtered"),
+            new BoundDimFilter("dim2", "2", "3", false, true, null, null, StringComparators.NUMERIC)
+        ),
+        new FilteredAggregatorFactory(
+            new CountAggregatorFactory("count_multivaldim_filtered"),
+            new SelectorDimFilter("dim3", "b", null)
+        ),
+        new FilteredAggregatorFactory(
+            new CountAggregatorFactory("count_numeric_filtered"),
+            new SelectorDimFilter("met1", "11", null)
+        )
+    });
 
     index.add(
         new MapBasedInputRow(
@@ -386,11 +333,9 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
       );
     }
 
-    final IncrementalIndex index = closerRule.closeLater(
-        indexCreator.createIndex(
-            ingestAggregatorFactories.toArray(
-                new AggregatorFactory[0]
-            )
+    final IncrementalIndex<?> index = indexCreator.createIndex(
+        (Object) ingestAggregatorFactories.toArray(
+            new AggregatorFactory[0]
         )
     );
 
@@ -501,8 +446,8 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
     }
 
 
-    final IncrementalIndex index = closerRule.closeLater(
-        indexCreator.createIndex(ingestAggregatorFactories.toArray(new AggregatorFactory[0]))
+    final IncrementalIndex<?> index = indexCreator.createIndex(
+        (Object) ingestAggregatorFactories.toArray(new AggregatorFactory[0])
     );
     final int concurrentThreads = 2;
     final int elementsPerThread = 10_000;
@@ -679,7 +624,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
   @Test
   public void testConcurrentAdd() throws Exception
   {
-    final IncrementalIndex index = closerRule.closeLater(indexCreator.createIndex(DEFAULT_AGGREGATOR_FACTORIES));
+    final IncrementalIndex<?> index = indexCreator.createIndex((Object) DEFAULT_AGGREGATOR_FACTORIES);
     final int threadCount = 10;
     final int elementsPerThread = 200;
     final int dimensionCount = 5;
@@ -725,22 +670,23 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
   @Test
   public void testgetDimensions()
   {
-    final IncrementalIndex<Aggregator> incrementalIndex = (OnheapIncrementalIndex) new OnheapIncrementalIndex.Builder()
-        .setIndexSchema(
-            new IncrementalIndexSchema.Builder()
-                .withMetrics(new CountAggregatorFactory("count"))
-                .withDimensionsSpec(
-                    new DimensionsSpec(
-                        DimensionsSpec.getDefaultSchemas(Arrays.asList("dim0", "dim1")),
-                        null,
-                        null
+    final IncrementalIndex<?> incrementalIndex = indexCreator.createIndex(
+        (builder, args) -> builder
+            .setIndexSchema(
+                new IncrementalIndexSchema.Builder()
+                    .withMetrics(new CountAggregatorFactory("count"))
+                    .withDimensionsSpec(
+                        new DimensionsSpec(
+                            DimensionsSpec.getDefaultSchemas(Arrays.asList("dim0", "dim1")),
+                            null,
+                            null
+                        )
                     )
-                )
-                .build()
-        )
-        .setMaxRowCount(1000000)
-        .build();
-    closerRule.closeLater(incrementalIndex);
+                    .build()
+            )
+            .setMaxRowCount(1000000)
+            .build()
+    );
 
     Assert.assertEquals(Arrays.asList("dim0", "dim1"), incrementalIndex.getDimensionNames());
   }
@@ -748,11 +694,13 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
   @Test
   public void testDynamicSchemaRollup() throws IndexSizeExceededException
   {
-    IncrementalIndex<Aggregator> index = (OnheapIncrementalIndex) new OnheapIncrementalIndex.Builder()
-        .setSimpleTestingIndexSchema(/* empty */)
-        .setMaxRowCount(10)
-        .build();
-    closerRule.closeLater(index);
+    final IncrementalIndex<?> index = indexCreator.createIndex(
+        (builder, args) -> builder
+            .setSimpleTestingIndexSchema(/* empty */)
+            .setMaxRowCount(10)
+            .build()
+    );
+
     index.add(
         new MapBasedInputRow(
             1481871600000L,
diff --git a/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterBonusTest.java b/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterBonusTest.java
index 5cc8190..94e413c 100644
--- a/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterBonusTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterBonusTest.java
@@ -56,6 +56,7 @@ import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.joda.time.Interval;
 import org.junit.Test;
@@ -114,7 +115,7 @@ public class SpatialFilterBonusTest
 
   private static IncrementalIndex makeIncrementalIndex() throws IOException
   {
-    IncrementalIndex theIndex = new IncrementalIndex.Builder()
+    IncrementalIndex theIndex = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(
             new IncrementalIndexSchema.Builder()
                 .withMinTimestamp(DATA_INTERVAL.getStartMillis())
@@ -134,7 +135,7 @@ public class SpatialFilterBonusTest
                 ).build()
         )
         .setMaxRowCount(NUM_POINTS)
-        .buildOnheap();
+        .build();
 
     theIndex.add(
         new MapBasedInputRow(
@@ -261,7 +262,7 @@ public class SpatialFilterBonusTest
   )
   {
     try {
-      IncrementalIndex first = new IncrementalIndex.Builder()
+      IncrementalIndex first = new OnheapIncrementalIndex.Builder()
           .setIndexSchema(
               new IncrementalIndexSchema.Builder()
                   .withMinTimestamp(DATA_INTERVAL.getStartMillis())
@@ -282,9 +283,9 @@ public class SpatialFilterBonusTest
                   ).build()
           )
           .setMaxRowCount(NUM_POINTS)
-          .buildOnheap();
+          .build();
 
-      IncrementalIndex second = new IncrementalIndex.Builder()
+      IncrementalIndex second = new OnheapIncrementalIndex.Builder()
           .setIndexSchema(
               new IncrementalIndexSchema.Builder()
                   .withMinTimestamp(DATA_INTERVAL.getStartMillis())
@@ -304,9 +305,9 @@ public class SpatialFilterBonusTest
                   ).build()
           )
           .setMaxRowCount(NUM_POINTS)
-          .buildOnheap();
+          .build();
 
-      IncrementalIndex third = new IncrementalIndex.Builder()
+      IncrementalIndex third = new OnheapIncrementalIndex.Builder()
           .setIndexSchema(
               new IncrementalIndexSchema.Builder()
                   .withMinTimestamp(DATA_INTERVAL.getStartMillis())
@@ -327,7 +328,7 @@ public class SpatialFilterBonusTest
                   ).build()
           )
           .setMaxRowCount(NUM_POINTS)
-          .buildOnheap();
+          .build();
 
       first.add(
           new MapBasedInputRow(
diff --git a/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterTest.java b/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterTest.java
index 8b961b6..47835bf 100644
--- a/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterTest.java
@@ -55,6 +55,7 @@ import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.joda.time.Interval;
@@ -113,7 +114,7 @@ public class SpatialFilterTest extends InitializedNullHandlingTest
 
   private static IncrementalIndex makeIncrementalIndex() throws IOException
   {
-    IncrementalIndex theIndex = new IncrementalIndex.Builder()
+    IncrementalIndex theIndex = new OnheapIncrementalIndex.Builder()
         .setIndexSchema(
             new IncrementalIndexSchema.Builder()
                 .withMinTimestamp(DATA_INTERVAL.getStartMillis())
@@ -138,7 +139,7 @@ public class SpatialFilterTest extends InitializedNullHandlingTest
                 ).build()
         )
         .setMaxRowCount(NUM_POINTS)
-        .buildOnheap();
+        .build();
 
     theIndex.add(
         new MapBasedInputRow(
@@ -279,7 +280,7 @@ public class SpatialFilterTest extends InitializedNullHandlingTest
   private static QueryableIndex makeMergedQueryableIndex(IndexSpec indexSpec)
   {
     try {
-      IncrementalIndex first = new IncrementalIndex.Builder()
+      IncrementalIndex first = new OnheapIncrementalIndex.Builder()
           .setIndexSchema(
               new IncrementalIndexSchema.Builder()
                   .withMinTimestamp(DATA_INTERVAL.getStartMillis())
@@ -303,9 +304,9 @@ public class SpatialFilterTest extends InitializedNullHandlingTest
                   ).build()
           )
           .setMaxRowCount(1000)
-          .buildOnheap();
+          .build();
 
-      IncrementalIndex second = new IncrementalIndex.Builder()
+      IncrementalIndex second = new OnheapIncrementalIndex.Builder()
           .setIndexSchema(
               new IncrementalIndexSchema.Builder()
                   .withMinTimestamp(DATA_INTERVAL.getStartMillis())
@@ -329,9 +330,9 @@ public class SpatialFilterTest extends InitializedNullHandlingTest
                   ).build()
           )
           .setMaxRowCount(1000)
-          .buildOnheap();
+          .build();
 
-      IncrementalIndex third = new IncrementalIndex.Builder()
+      IncrementalIndex third = new OnheapIncrementalIndex.Builder()
           .setIndexSchema(
               new IncrementalIndexSchema.Builder()
                   .withMinTimestamp(DATA_INTERVAL.getStartMillis())
@@ -355,7 +356,7 @@ public class SpatialFilterTest extends InitializedNullHandlingTest
                   ).build()
           )
           .setMaxRowCount(NUM_POINTS)
-          .buildOnheap();
+          .build();
 
       first.add(
           new MapBasedInputRow(
diff --git a/processing/src/test/java/org/apache/druid/segment/generator/DataGeneratorTest.java b/processing/src/test/java/org/apache/druid/segment/generator/DataGeneratorTest.java
index 814e8ae..0da7d87 100644
--- a/processing/src/test/java/org/apache/druid/segment/generator/DataGeneratorTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/generator/DataGeneratorTest.java
@@ -21,8 +21,17 @@ package org.apache.druid.segment.generator;
 
 import org.apache.commons.math3.distribution.NormalDistribution;
 import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
+import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -34,7 +43,7 @@ import java.util.List;
 import java.util.Map;
 
 // Doesn't assert behavior right now, just generates rows and prints out some distribution numbers
-public class DataGeneratorTest
+public class DataGeneratorTest extends InitializedNullHandlingTest
 {
   @Test
   public void testSequential()
@@ -538,4 +547,124 @@ public class DataGeneratorTest
       }
     }
   }
+
+  @Test
+  public void testToList()
+  {
+    List<GeneratorColumnSchema> schemas = new ArrayList<>();
+    RowValueTracker tracker = new RowValueTracker();
+
+    schemas.add(
+        GeneratorColumnSchema.makeSequential(
+            "dimA",
+            ValueType.STRING,
+            false,
+            1,
+            null,
+            10,
+            20
+        )
+    );
+
+    schemas.add(
+        GeneratorColumnSchema.makeEnumeratedSequential(
+            "dimB",
+            ValueType.STRING,
+            false,
+            1,
+            null,
+            Arrays.asList("Hello", "World", "Foo", "Bar")
+        )
+    );
+
+    schemas.add(
+        GeneratorColumnSchema.makeSequential(
+            "dimC",
+            ValueType.STRING,
+            false,
+            1,
+            0.50,
+            30,
+            40
+        )
+    );
+
+    DataGenerator dataGenerator = new DataGenerator(schemas, 9999, 0, 0, 1000.0);
+    List<InputRow> rows = dataGenerator.toList(100);
+    Assert.assertEquals(100, rows.size());
+
+    for (InputRow row : rows) {
+      tracker.addRow(row);
+    }
+    tracker.printStuff();
+  }
+
+  @Test
+  public void testToIndex()
+  {
+    List<GeneratorColumnSchema> schemas = new ArrayList<>();
+
+    schemas.add(
+        GeneratorColumnSchema.makeSequential(
+            "dimA",
+            ValueType.STRING,
+            false,
+            1,
+            null,
+            10,
+            20
+        )
+    );
+
+    schemas.add(
+        GeneratorColumnSchema.makeEnumeratedSequential(
+            "dimB",
+            ValueType.STRING,
+            false,
+            1,
+            null,
+            Arrays.asList("Hello", "World", "Foo", "Bar")
+        )
+    );
+
+    schemas.add(
+        GeneratorColumnSchema.makeSequential(
+            "dimC",
+            ValueType.STRING,
+            false,
+            1,
+            0.50,
+            30,
+            40
+        )
+    );
+
+    DataGenerator dataGenerator = new DataGenerator(schemas, 9999, 0, 0, 1000.0);
+
+    DimensionsSpec dimensions = new DimensionsSpec(
+        Arrays.asList(
+            new StringDimensionSchema("dimA"),
+            new StringDimensionSchema("dimB"),
+            new StringDimensionSchema("dimC")
+        ), null, null
+    );
+    AggregatorFactory[] metrics = {
+        new CountAggregatorFactory("cnt")
+    };
+    final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder()
+        .withQueryGranularity(Granularities.MINUTE)
+        .withDimensionsSpec(dimensions)
+        .withMetrics(metrics)
+        .withRollup(false)
+        .build();
+
+    IncrementalIndex<?> index = new OnheapIncrementalIndex.Builder()
+        .setIndexSchema(schema)
+        .setSortFacts(false)
+        .setMaxRowCount(1_000_000)
+        .build();
+
+    dataGenerator.addToIndex(index, 100);
+    Assert.assertEquals(100, index.size());
+  }
 }
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAdapterTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAdapterTest.java
index 32a26a5..e9c6139 100644
--- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAdapterTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexAdapterTest.java
@@ -19,7 +19,10 @@
 
 package org.apache.druid.segment.incremental;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.segment.CloserRule;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.IndexableAdapter;
 import org.apache.druid.segment.RowIterator;
@@ -31,12 +34,17 @@ import org.apache.druid.segment.data.ConciseBitmapSerdeFactory;
 import org.apache.druid.segment.data.IncrementalIndexTest;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.function.Function;
 
+@RunWith(Parameterized.class)
 public class IncrementalIndexAdapterTest extends InitializedNullHandlingTest
 {
   private static final IndexSpec INDEX_SPEC = new IndexSpec(
@@ -46,11 +54,31 @@ public class IncrementalIndexAdapterTest extends InitializedNullHandlingTest
       CompressionFactory.LongEncodingStrategy.LONGS
   );
 
+  public final IncrementalIndexCreator indexCreator;
+
+  @Rule
+  public final CloserRule closer = new CloserRule(false);
+
+  public IncrementalIndexAdapterTest(String indexType) throws JsonProcessingException
+  {
+    indexCreator = closer.closeLater(new IncrementalIndexCreator(indexType, (builder, args) -> builder
+        .setSimpleTestingIndexSchema("rollup".equals(args[0]), new CountAggregatorFactory("count"))
+        .setMaxRowCount(1_000_000)
+        .build()
+    ));
+  }
+
+  @Parameterized.Parameters(name = "{index}: {0}")
+  public static Collection<?> constructorFeeder()
+  {
+    return IncrementalIndexCreator.getAppendableIndexTypes();
+  }
+
   @Test
   public void testGetBitmapIndex() throws Exception
   {
     final long timestamp = System.currentTimeMillis();
-    IncrementalIndex incrementalIndex = IncrementalIndexTest.createIndex(null);
+    IncrementalIndex<?> incrementalIndex = indexCreator.createIndex("rollup");
     IncrementalIndexTest.populateIndex(timestamp, incrementalIndex);
     IndexableAdapter adapter = new IncrementalIndexAdapter(
         incrementalIndex.getInterval(),
@@ -70,7 +98,7 @@ public class IncrementalIndexAdapterTest extends InitializedNullHandlingTest
   public void testGetRowsIterable() throws Exception
   {
     final long timestamp = System.currentTimeMillis();
-    IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null);
+    IncrementalIndex<?> toPersist1 = indexCreator.createIndex("rollup");
     IncrementalIndexTest.populateIndex(timestamp, toPersist1);
 
     final IndexableAdapter incrementalAdapter = new IncrementalIndexAdapter(
@@ -94,7 +122,7 @@ public class IncrementalIndexAdapterTest extends InitializedNullHandlingTest
   public void testGetRowsIterableNoRollup() throws Exception
   {
     final long timestamp = System.currentTimeMillis();
-    IncrementalIndex toPersist1 = IncrementalIndexTest.createNoRollupIndex(null);
+    IncrementalIndex<?> toPersist1 = indexCreator.createIndex("plain");
     IncrementalIndexTest.populateIndex(timestamp, toPersist1);
     IncrementalIndexTest.populateIndex(timestamp, toPersist1);
     IncrementalIndexTest.populateIndex(timestamp, toPersist1);
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCreator.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCreator.java
new file mode 100644
index 0000000..3f112cc
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCreator.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.cfg.MapperConfig;
+import com.fasterxml.jackson.databind.introspect.AnnotatedClass;
+import com.fasterxml.jackson.databind.introspect.AnnotatedClassResolver;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.jsontype.SubtypeResolver;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.io.Closer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * This class handles the incremental-index lifecycle for testing.
+ * Any index created using this class during the test will be closed automatically once this class is closed.
+ *
+ * To allow testing multiple incremental-index implementations, this class can be instantiated with any
+ * {@code AppendableIndexSpec} instance.
+ * Alternatively, this class can instantiate an {@code AppendableIndexSpec} for you given the appendable-index type as
+ * a string.
+ * This allows tests' parameterization with the appendable-index types as strings.
+ *
+ * To further facilitate the tests' parameterization, this class supports listing all the available incremental-index
+ * implementations, and produce a cartesian product of many parameter options together with each incremental-index
+ * implementation.
+ */
+public class IncrementalIndexCreator implements Closeable
+{
+  public static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
+
+  /**
+   * Allows adding support for testing unregistered indexes.
+   * It is used by Druid's extensions for the incremental-index.
+   *
+   * @param c    an index spec class
+   * @param name an index spec name
+   */
+  public static void addIndexSpec(Class<?> c, String name)
+  {
+    JSON_MAPPER.registerSubtypes(new NamedType(c, name));
+  }
+
+  static {
+    // The off-heap incremental-index is not registered for production, but we want to include it in the tests.
+    IncrementalIndexCreator.addIndexSpec(OffheapIncrementalIndexTestSpec.class, OffheapIncrementalIndexTestSpec.TYPE);
+  }
+
+  /**
+   * Fetch all the available incremental-index implementations.
+   * It can be used to parametrize the test. If more parameters are needed, use indexTypeCartesianProduct().
+   * @see #indexTypeCartesianProduct(Collection[]).
+   *
+   * @return a list of all the incremental-index implementations types (String)
+   */
+  public static List<String> getAppendableIndexTypes()
+  {
+    SubtypeResolver resolver = JSON_MAPPER.getSubtypeResolver();
+    MapperConfig<?> config = JSON_MAPPER.getDeserializationConfig();
+    AnnotatedClass cls = AnnotatedClassResolver.resolveWithoutSuperTypes(config, AppendableIndexSpec.class);
+    Collection<NamedType> types = resolver.collectAndResolveSubtypesByClass(config, cls);
+    return types.stream().map(NamedType::getName).filter(Objects::nonNull).distinct().collect(Collectors.toList());
+  }
+
+  public interface IndexCreator
+  {
+    /**
+     * Build an index given a builder and args.
+     *
+     * @param builder an incremental index builder supplied by the framework
+     * @param args a list of arguments that are used to configure the builder
+     * @return a new instance of an incremental-index
+     */
+    IncrementalIndex<?> createIndex(AppendableIndexBuilder builder, Object... args);
+  }
+
+  private final Closer closer = Closer.create();
+
+  private final AppendableIndexSpec appendableIndexSpec;
+
+  private final IndexCreator indexCreator;
+
+  /**
+   * Initialize the creator.
+   *
+   * @param spec a spec that can generate a incremental-index builder
+   * @param indexCreator a function that generate an index given a builder and arguments
+   */
+  public IncrementalIndexCreator(AppendableIndexSpec spec, IndexCreator indexCreator)
+  {
+    this.appendableIndexSpec = spec;
+    this.indexCreator = indexCreator;
+  }
+
+  /**
+   * Initialize the creator.
+   *
+   * @param indexType an index type (name)
+   * @param indexCreator a function that generate an index given a builder and arguments
+   */
+  public IncrementalIndexCreator(String indexType, IndexCreator indexCreator) throws JsonProcessingException
+  {
+    this(parseIndexType(indexType), indexCreator);
+  }
+
+  /**
+   * Generate an AppendableIndexSpec from index type.
+   *
+   * @param indexType an index type
+   * @return AppendableIndexSpec instance of this type
+   * @throws JsonProcessingException if failed to to parse the index
+   */
+  public static AppendableIndexSpec parseIndexType(String indexType) throws JsonProcessingException
+  {
+    return JSON_MAPPER.readValue(
+        StringUtils.format("{\"type\": \"%s\"}", indexType),
+        AppendableIndexSpec.class
+    );
+  }
+
+  /**
+   * Create an index given the input args.
+   *
+   * @param args The arguments for the index-generator
+   * @return An incremental-index instance
+   */
+  public final IncrementalIndex<?> createIndex(Object... args)
+  {
+    return createIndex(indexCreator, args);
+  }
+
+  /**
+   * Create an index given the input args with a specialized index-creator.
+   *
+   * @param args The arguments for the index-generator
+   * @return An incremental-index instance
+   */
+  public final IncrementalIndex<?> createIndex(IndexCreator indexCreator, Object... args)
+  {
+    return closer.register(indexCreator.createIndex(appendableIndexSpec.builder(), args));
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    closer.close();
+
+    if (appendableIndexSpec instanceof Closeable) {
+      ((Closeable) appendableIndexSpec).close();
+    }
+  }
+
+  /**
+   * Generates all the permutations of the parameters with each of the registered appendable index types.
+   * It is used to parameterize the tests with all the permutations of the parameters
+   * together with all the appnedbale index types.
+   *
+   * For example, for a parameterized test with the following constrctor:
+   * {@code
+   *   public IncrementalIndexTest(String indexType, String mode, boolean deserializeComplexMetrics)
+   *   {
+   *     ...
+   *   }
+   * }
+   *
+   * we can test all the input combinations as follows:
+   * {@code
+   *   @Parameterized.Parameters(name = "{index}: {0}, {1}, deserialize={2}")
+   *   public static Collection<?> constructorFeeder()
+   *   {
+   *     return IncrementalIndexCreator.indexTypeCartesianProduct(
+   *         ImmutableList.of("rollup", "plain"),
+   *         ImmutableList.of(true, false)
+   *     );
+   *   }
+   * }
+   *
+   * @param c a list of collections of parameters
+   * @return the cartesian product of all parameters and appendable index types
+   */
+  public static List<Object[]> indexTypeCartesianProduct(Collection<?>... c)
+  {
+    Collection<?>[] args = new Collection<?>[c.length + 1];
+    args[0] = getAppendableIndexTypes();
+    System.arraycopy(c, 0, args, 1, c.length);
+    return cartesianProduct(args);
+  }
+
+  /**
+   * Generates all the permutations of the parameters.
+   *
+   * @param c a list of collections of parameters
+   * @return the cartesian product of all parameters
+   */
+  private static List<Object[]> cartesianProduct(Collection<?>... c)
+  {
+    final ArrayList<Object[]> res = new ArrayList<>();
+    final int curLength = c.length;
+
+    if (curLength == 0) {
+      res.add(new Object[0]);
+      return res;
+    }
+
+    final int curItem = curLength - 1;
+    for (Object[] objList : cartesianProduct(Arrays.copyOfRange(c, 0, curItem))) {
+      for (Object o : c[curItem]) {
+        Object[] newObjList = Arrays.copyOf(objList, curLength);
+        newObjList[curItem] = o;
+        res.add(newObjList);
+      }
+    }
+
+    return res;
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java
similarity index 68%
rename from processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java
rename to processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java
index d475a57..5886f7c 100644
--- a/processing/src/test/java/org/apache/druid/segment/incremental/OnheapIncrementalIndexTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexIngestionTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.segment.incremental;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.data.input.MapBasedInputRow;
@@ -30,32 +31,54 @@ import org.apache.druid.query.aggregation.LongMaxAggregator;
 import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.segment.CloserRule;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.easymock.EasyMock;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Random;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 
-public class OnheapIncrementalIndexTest extends InitializedNullHandlingTest
+@RunWith(Parameterized.class)
+public class IncrementalIndexIngestionTest extends InitializedNullHandlingTest
 {
-  private static final int MAX_ROWS = 100000;
+  private static final int MAX_ROWS = 100_000;
+
+  public final IncrementalIndexCreator indexCreator;
+
+  @Rule
+  public final CloserRule closer = new CloserRule(false);
+
+  public IncrementalIndexIngestionTest(String indexType) throws JsonProcessingException
+  {
+    indexCreator = closer.closeLater(new IncrementalIndexCreator(indexType, (builder, args) -> builder
+        .setIndexSchema((IncrementalIndexSchema) args[0])
+        .setMaxRowCount(MAX_ROWS)
+        .build()
+    ));
+  }
+
+  @Parameterized.Parameters(name = "{index}: {0}")
+  public static Collection<?> constructorFeeder()
+  {
+    return IncrementalIndexCreator.getAppendableIndexTypes();
+  }
 
   @Test
   public void testMultithreadAddFacts() throws Exception
   {
-    final IncrementalIndex index = new IncrementalIndex.Builder()
-        .setIndexSchema(
-            new IncrementalIndexSchema.Builder()
-                .withQueryGranularity(Granularities.MINUTE)
-                .withMetrics(new LongMaxAggregatorFactory("max", "max"))
-                .build()
-        )
-        .setMaxRowCount(MAX_ROWS)
-        .buildOnheap();
+    final IncrementalIndex<?> index = indexCreator.createIndex(new IncrementalIndexSchema.Builder()
+        .withQueryGranularity(Granularities.MINUTE)
+        .withMetrics(new LongMaxAggregatorFactory("max", "max"))
+        .build()
+    );
 
     final int addThreadCount = 2;
     Thread[] addThreads = new Thread[addThreadCount];
@@ -111,39 +134,33 @@ public class OnheapIncrementalIndexTest extends InitializedNullHandlingTest
   @Test
   public void testMultithreadAddFactsUsingExpressionAndJavaScript() throws Exception
   {
-    final IncrementalIndex indexExpr = new IncrementalIndex.Builder()
-        .setIndexSchema(
-            new IncrementalIndexSchema.Builder()
-                .withQueryGranularity(Granularities.MINUTE)
-                .withMetrics(new LongSumAggregatorFactory(
-                    "oddnum",
-                    null,
-                    "if(value%2==1,1,0)",
-                    TestExprMacroTable.INSTANCE
-                ))
-                .withRollup(true)
-                .build()
-        )
-        .setMaxRowCount(MAX_ROWS)
-        .buildOnheap();
-
-    final IncrementalIndex indexJs = new IncrementalIndex.Builder()
-        .setIndexSchema(
-            new IncrementalIndexSchema.Builder()
-                .withQueryGranularity(Granularities.MINUTE)
-                .withMetrics(new JavaScriptAggregatorFactory(
-                    "oddnum",
-                    ImmutableList.of("value"),
-                    "function(current, value) { if (value%2==1) current = current + 1; return current;}",
-                    "function() {return 0;}",
-                    "function(a, b) { return a + b;}",
-                    JavaScriptConfig.getEnabledInstance()
-                ))
-                .withRollup(true)
-                .build()
-        )
-        .setMaxRowCount(MAX_ROWS)
-        .buildOnheap();
+    final IncrementalIndex<?> indexExpr = indexCreator.createIndex(
+        new IncrementalIndexSchema.Builder()
+            .withQueryGranularity(Granularities.MINUTE)
+            .withMetrics(new LongSumAggregatorFactory(
+                "oddnum",
+                null,
+                "if(value%2==1,1,0)",
+                TestExprMacroTable.INSTANCE
+            ))
+            .withRollup(true)
+            .build()
+    );
+
+    final IncrementalIndex<?> indexJs = indexCreator.createIndex(
+        new IncrementalIndexSchema.Builder()
+            .withQueryGranularity(Granularities.MINUTE)
+            .withMetrics(new JavaScriptAggregatorFactory(
+                "oddnum",
+                ImmutableList.of("value"),
+                "function(current, value) { if (value%2==1) current = current + 1; return current;}",
+                "function() {return 0;}",
+                "function(a, b) { return a + b;}",
+                JavaScriptConfig.getEnabledInstance()
+            ))
+            .withRollup(true)
+            .build()
+    );
 
     final int addThreadCount = 2;
     Thread[] addThreads = new Thread[addThreadCount];
@@ -205,15 +222,19 @@ public class OnheapIncrementalIndexTest extends InitializedNullHandlingTest
     mockedAggregator.close();
     EasyMock.expectLastCall().times(1);
 
-    final OnheapIncrementalIndex index = (OnheapIncrementalIndex) new IncrementalIndex.Builder()
-        .setIndexSchema(
-            new IncrementalIndexSchema.Builder()
-                .withQueryGranularity(Granularities.MINUTE)
-                .withMetrics(new LongMaxAggregatorFactory("max", "max"))
-                .build()
-        )
-        .setMaxRowCount(MAX_ROWS)
-        .buildOnheap();
+    final IncrementalIndex<?> genericIndex = indexCreator.createIndex(
+        new IncrementalIndexSchema.Builder()
+            .withQueryGranularity(Granularities.MINUTE)
+            .withMetrics(new LongMaxAggregatorFactory("max", "max"))
+            .build()
+    );
+
+    // This test is specific to the on-heap index
+    if (!(genericIndex instanceof OnheapIncrementalIndex)) {
+      return;
+    }
+
+    final OnheapIncrementalIndex index = (OnheapIncrementalIndex) genericIndex;
 
     index.add(new MapBasedInputRow(
             0,
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexMultiValueSpecTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexMultiValueSpecTest.java
index dfd386f..24aae3a 100644
--- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexMultiValueSpecTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexMultiValueSpecTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.segment.incremental;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.collect.Lists;
 import org.apache.druid.data.input.MapBasedInputRow;
 import org.apache.druid.data.input.Row;
@@ -28,19 +29,45 @@ import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.CloserRule;
 import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
 /**
  */
+@RunWith(Parameterized.class)
 public class IncrementalIndexMultiValueSpecTest extends InitializedNullHandlingTest
 {
+  public final IncrementalIndexCreator indexCreator;
+
+  @Rule
+  public final CloserRule closer = new CloserRule(false);
+
+  public IncrementalIndexMultiValueSpecTest(String indexType) throws JsonProcessingException
+  {
+    indexCreator = closer.closeLater(new IncrementalIndexCreator(indexType, (builder, args) -> builder
+        .setIndexSchema((IncrementalIndexSchema) args[0])
+        .setMaxRowCount(10_000)
+        .build()
+    ));
+  }
+
+  @Parameterized.Parameters(name = "{index}: {0}")
+  public static Collection<?> constructorFeeder()
+  {
+    return IncrementalIndexCreator.getAppendableIndexTypes();
+  }
+
   @Test
   public void test() throws IndexSizeExceededException
   {
@@ -78,10 +105,7 @@ public class IncrementalIndexMultiValueSpecTest extends InitializedNullHandlingT
         return null;
       }
     };
-    IncrementalIndex<?> index = new IncrementalIndex.Builder()
-        .setIndexSchema(schema)
-        .setMaxRowCount(10000)
-        .buildOnheap();
+    IncrementalIndex<?> index = indexCreator.createIndex(schema);
     index.add(
         new MapBasedInputRow(
             0,
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexRowCompTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexRowCompTest.java
index f6f95e0..166b332 100644
--- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexRowCompTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexRowCompTest.java
@@ -19,29 +19,54 @@
 
 package org.apache.druid.segment.incremental;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.collect.Lists;
 import org.apache.druid.data.input.MapBasedInputRow;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.segment.CloserRule;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 
 /**
  */
+@RunWith(Parameterized.class)
 public class IncrementalIndexRowCompTest extends InitializedNullHandlingTest
 {
+  public final IncrementalIndexCreator indexCreator;
+
+  @Rule
+  public final CloserRule closer = new CloserRule(false);
+
+  public IncrementalIndexRowCompTest(String indexType) throws JsonProcessingException
+  {
+    indexCreator = closer.closeLater(
+        new IncrementalIndexCreator(indexType, (builder, args) -> builder
+            .setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt"))
+            .setMaxRowCount(1_000)
+            .build())
+    );
+  }
+
+  @Parameterized.Parameters(name = "{index}: {0}")
+  public static Collection<?> constructorFeeder()
+  {
+    return IncrementalIndexCreator.getAppendableIndexTypes();
+  }
+
   @Test
   public void testBasic()
   {
-    IncrementalIndex<?> index = new IncrementalIndex.Builder()
-        .setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt"))
-        .setMaxRowCount(1000)
-        .buildOnheap();
+    IncrementalIndex<?> index = indexCreator.createIndex();
 
     long time = System.currentTimeMillis();
     IncrementalIndexRow ir1 = index.toIncrementalIndexRow(toMapRow(time, "billy", "A", "joe", "B")).getIncrementalIndexRow();
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexRowSizeTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexRowSizeTest.java
index 865789e..cfd2e1a 100644
--- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexRowSizeTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexRowSizeTest.java
@@ -19,30 +19,55 @@
 
 package org.apache.druid.segment.incremental;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.collect.Lists;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.data.input.MapBasedInputRow;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.segment.CloserRule;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
 /**
  */
+@RunWith(Parameterized.class)
 public class IncrementalIndexRowSizeTest extends InitializedNullHandlingTest
 {
+  public final IncrementalIndexCreator indexCreator;
+
+  @Rule
+  public final CloserRule closer = new CloserRule(false);
+
+  public IncrementalIndexRowSizeTest(String indexType) throws JsonProcessingException
+  {
+    indexCreator = closer.closeLater(
+        new IncrementalIndexCreator(indexType, (builder, args) -> builder
+            .setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt"))
+            .setMaxRowCount(10_000)
+            .setMaxBytesInMemory(1_000)
+            .build())
+    );
+  }
+
+  @Parameterized.Parameters(name = "{index}: {0}")
+  public static Collection<?> constructorFeeder()
+  {
+    return IncrementalIndexCreator.getAppendableIndexTypes();
+  }
+
   @Test
   public void testIncrementalIndexRowSizeBasic()
   {
-    IncrementalIndex index = new IncrementalIndex.Builder()
-        .setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt"))
-        .setMaxRowCount(10000)
-        .setMaxBytesInMemory(1000)
-        .buildOnheap();
+    IncrementalIndex<?> index = indexCreator.createIndex();
     long time = System.currentTimeMillis();
     IncrementalIndex.IncrementalIndexRowResult tndResult = index.toIncrementalIndexRow(toMapRow(
         time,
@@ -59,11 +84,7 @@ public class IncrementalIndexRowSizeTest extends InitializedNullHandlingTest
   @Test
   public void testIncrementalIndexRowSizeArr()
   {
-    IncrementalIndex index = new IncrementalIndex.Builder()
-        .setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt"))
-        .setMaxRowCount(10000)
-        .setMaxBytesInMemory(1000)
-        .buildOnheap();
+    IncrementalIndex<?> index = indexCreator.createIndex();
     long time = System.currentTimeMillis();
     IncrementalIndex.IncrementalIndexRowResult tndResult = index.toIncrementalIndexRow(toMapRow(
         time + 1,
@@ -80,11 +101,7 @@ public class IncrementalIndexRowSizeTest extends InitializedNullHandlingTest
   @Test
   public void testIncrementalIndexRowSizeComplex()
   {
-    IncrementalIndex index = new IncrementalIndex.Builder()
-        .setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt"))
-        .setMaxRowCount(10000)
-        .setMaxBytesInMemory(1000)
-        .buildOnheap();
+    IncrementalIndex<?> index = indexCreator.createIndex();
     long time = System.currentTimeMillis();
     IncrementalIndex.IncrementalIndexRowResult tndResult = index.toIncrementalIndexRow(toMapRow(
         time + 1,
@@ -101,11 +118,7 @@ public class IncrementalIndexRowSizeTest extends InitializedNullHandlingTest
   @Test
   public void testIncrementalIndexRowSizeEmptyString()
   {
-    IncrementalIndex index = new IncrementalIndex.Builder()
-        .setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt"))
-        .setMaxRowCount(10000)
-        .setMaxBytesInMemory(1000)
-        .buildOnheap();
+    IncrementalIndex<?> index = indexCreator.createIndex();
     long time = System.currentTimeMillis();
     IncrementalIndex.IncrementalIndexRowResult tndResult = index.toIncrementalIndexRow(toMapRow(
         time + 1,
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java
index ec03a9e..6168719 100644
--- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.segment.incremental;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.base.Suppliers;
@@ -55,6 +56,7 @@ import org.apache.druid.query.groupby.GroupByQueryEngine;
 import org.apache.druid.query.topn.TopNQueryBuilder;
 import org.apache.druid.query.topn.TopNQueryEngine;
 import org.apache.druid.query.topn.TopNResultValue;
+import org.apache.druid.segment.CloserRule;
 import org.apache.druid.segment.ColumnSelector;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.Cursor;
@@ -68,6 +70,7 @@ import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -86,46 +89,30 @@ import java.util.concurrent.atomic.AtomicInteger;
 @RunWith(Parameterized.class)
 public class IncrementalIndexStorageAdapterTest extends InitializedNullHandlingTest
 {
-  interface IndexCreator
-  {
-    IncrementalIndex createIndex();
-  }
+  public final IncrementalIndexCreator indexCreator;
 
-  private final IndexCreator indexCreator;
+  @Rule
+  public final CloserRule closer = new CloserRule(false);
 
-  public IncrementalIndexStorageAdapterTest(
-      IndexCreator IndexCreator
-  )
+  public IncrementalIndexStorageAdapterTest(String indexType) throws JsonProcessingException
   {
-    this.indexCreator = IndexCreator;
+    indexCreator = closer.closeLater(new IncrementalIndexCreator(indexType, (builder, args) -> builder
+        .setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt"))
+        .setMaxRowCount(1_000)
+        .build()
+    ));
   }
 
-  @Parameterized.Parameters
+  @Parameterized.Parameters(name = "{index}: {0}")
   public static Collection<?> constructorFeeder()
   {
-    return Arrays.asList(
-        new Object[][]{
-            {
-                new IndexCreator()
-                {
-                  @Override
-                  public IncrementalIndex createIndex()
-                  {
-                    return new IncrementalIndex.Builder()
-                        .setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt"))
-                        .setMaxRowCount(1000)
-                        .buildOnheap();
-                  }
-                }
-            }
-        }
-    );
+    return IncrementalIndexCreator.getAppendableIndexTypes();
   }
 
   @Test
   public void testSanity() throws Exception
   {
-    IncrementalIndex index = indexCreator.createIndex();
+    IncrementalIndex<?> index = indexCreator.createIndex();
     index.add(
         new MapBasedInputRow(
             System.currentTimeMillis() - 1,
@@ -189,7 +176,7 @@ public class IncrementalIndexStorageAdapterTest extends InitializedNullHandlingT
   @Test
   public void testObjectColumnSelectorOnVaryingColumnSchema() throws Exception
   {
-    IncrementalIndex index = indexCreator.createIndex();
+    IncrementalIndex<?> index = indexCreator.createIndex();
     index.add(
         new MapBasedInputRow(
             DateTimes.of("2014-09-01T00:00:00"),
@@ -271,7 +258,7 @@ public class IncrementalIndexStorageAdapterTest extends InitializedNullHandlingT
   public void testResetSanity() throws IOException
   {
 
-    IncrementalIndex index = indexCreator.createIndex();
+    IncrementalIndex<?> index = indexCreator.createIndex();
     DateTime t = DateTimes.nowUtc();
     Interval interval = new Interval(t.minusMinutes(1), t.plusMinutes(1));
 
@@ -331,7 +318,7 @@ public class IncrementalIndexStorageAdapterTest extends InitializedNullHandlingT
   @Test
   public void testSingleValueTopN() throws IOException
   {
-    IncrementalIndex index = indexCreator.createIndex();
+    IncrementalIndex<?> index = indexCreator.createIndex();
     DateTime t = DateTimes.nowUtc();
     index.add(
         new MapBasedInputRow(
@@ -373,7 +360,7 @@ public class IncrementalIndexStorageAdapterTest extends InitializedNullHandlingT
   @Test
   public void testFilterByNull() throws Exception
   {
-    IncrementalIndex index = indexCreator.createIndex();
+    IncrementalIndex<?> index = indexCreator.createIndex();
     index.add(
         new MapBasedInputRow(
             System.currentTimeMillis() - 1,
@@ -434,7 +421,7 @@ public class IncrementalIndexStorageAdapterTest extends InitializedNullHandlingT
   @Test
   public void testCursoringAndIndexUpdationInterleaving() throws Exception
   {
-    final IncrementalIndex index = indexCreator.createIndex();
+    final IncrementalIndex<?> index = indexCreator.createIndex();
     final long timestamp = System.currentTimeMillis();
 
     for (int i = 0; i < 2; i++) {
@@ -498,7 +485,7 @@ public class IncrementalIndexStorageAdapterTest extends InitializedNullHandlingT
   {
     // Tests the dictionary ID race condition bug described at https://github.com/apache/druid/pull/6340
 
-    final IncrementalIndex index = indexCreator.createIndex();
+    final IncrementalIndex<?> index = indexCreator.createIndex();
     final long timestamp = System.currentTimeMillis();
 
     for (int i = 0; i < 5; i++) {
@@ -549,7 +536,7 @@ public class IncrementalIndexStorageAdapterTest extends InitializedNullHandlingT
   @Test
   public void testCursoringAndSnapshot() throws Exception
   {
-    final IncrementalIndex index = indexCreator.createIndex();
+    final IncrementalIndex<?> index = indexCreator.createIndex();
     final long timestamp = System.currentTimeMillis();
 
     for (int i = 0; i < 2; i++) {
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java
index b8b4e70..4a6b38b 100644
--- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java
@@ -19,10 +19,10 @@
 
 package org.apache.druid.segment.incremental;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
-import org.apache.druid.collections.CloseableStupidPool;
 import org.apache.druid.data.input.MapBasedInputRow;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.DoubleDimensionSchema;
@@ -31,7 +31,6 @@ import org.apache.druid.data.input.impl.LongDimensionSchema;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.parsers.ParseException;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
@@ -39,53 +38,38 @@ import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
 import org.apache.druid.query.filter.SelectorDimFilter;
 import org.apache.druid.segment.CloserRule;
 import org.apache.druid.testing.InitializedNullHandlingTest;
-import org.junit.After;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.List;
 
 /**
  */
 @RunWith(Parameterized.class)
 public class IncrementalIndexTest extends InitializedNullHandlingTest
 {
-  interface IndexCreator
-  {
-    IncrementalIndex createIndex();
-  }
-
-  @Rule
-  public ExpectedException expectedException = ExpectedException.none();
+  public final IncrementalIndexCreator indexCreator;
 
   @Rule
-  public final CloserRule closerRule = new CloserRule(false);
-
-  private final IndexCreator indexCreator;
-  private final Closer resourceCloser;
+  public final CloserRule closer = new CloserRule(false);
 
-  @After
-  public void teardown() throws IOException
+  public IncrementalIndexTest(String indexType, String mode, boolean deserializeComplexMetrics,
+                              IncrementalIndexSchema schema) throws JsonProcessingException
   {
-    resourceCloser.close();
-  }
-
-  public IncrementalIndexTest(IndexCreator IndexCreator, Closer resourceCloser)
-  {
-    this.indexCreator = IndexCreator;
-    this.resourceCloser = resourceCloser;
+    indexCreator = closer.closeLater(new IncrementalIndexCreator(indexType, (builder, args) -> builder
+        .setIndexSchema(schema)
+        .setDeserializeComplexMetrics(deserializeComplexMetrics)
+        .setSortFacts("rollup".equals(mode))
+        .setMaxRowCount(1_000_000)
+        .build())
+    );
   }
 
-  @Parameterized.Parameters
+  @Parameterized.Parameters(name = "{index}: {0}, {1}, deserialize={2}")
   public static Collection<?> constructorFeeder()
   {
     DimensionsSpec dimensions = new DimensionsSpec(
@@ -108,59 +92,17 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
         .withMetrics(metrics)
         .build();
 
-    final List<Object[]> constructors = new ArrayList<>();
-    for (final Boolean sortFacts : ImmutableList.of(false, true)) {
-      constructors.add(
-          new Object[]{
-              new IndexCreator()
-              {
-                @Override
-                public IncrementalIndex createIndex()
-                {
-                  return new OnheapIncrementalIndex.Builder()
-                      .setIndexSchema(schema)
-                      .setDeserializeComplexMetrics(false)
-                      .setSortFacts(sortFacts)
-                      .setMaxRowCount(1000)
-                      .build();
-                }
-              },
-              Closer.create()
-          }
-      );
-      final Closer poolCloser = Closer.create();
-      final CloseableStupidPool<ByteBuffer> stupidPool = new CloseableStupidPool<>(
-          "OffheapIncrementalIndex-bufferPool",
-          () -> ByteBuffer.allocate(256 * 1024)
-      );
-      poolCloser.register(stupidPool);
-      constructors.add(
-          new Object[]{
-              new IndexCreator()
-              {
-                @Override
-                public IncrementalIndex createIndex()
-                {
-                  return new OffheapIncrementalIndex.Builder()
-                      .setBufferPool(stupidPool)
-                      .setIndexSchema(schema)
-                      .setSortFacts(sortFacts)
-                      .setMaxRowCount(1000000)
-                      .build();
-                }
-              },
-              poolCloser
-          }
-      );
-    }
-
-    return constructors;
+    return IncrementalIndexCreator.indexTypeCartesianProduct(
+        ImmutableList.of("rollup", "plain"),
+        ImmutableList.of(true, false),
+        ImmutableList.of(schema)
+    );
   }
 
   @Test(expected = ISE.class)
   public void testDuplicateDimensions() throws IndexSizeExceededException
   {
-    IncrementalIndex index = closerRule.closeLater(indexCreator.createIndex());
+    IncrementalIndex<?> index = indexCreator.createIndex();
     index.add(
         new MapBasedInputRow(
             System.currentTimeMillis() - 1,
@@ -180,7 +122,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
   @Test(expected = ISE.class)
   public void testDuplicateDimensionsFirstOccurrence() throws IndexSizeExceededException
   {
-    IncrementalIndex index = closerRule.closeLater(indexCreator.createIndex());
+    IncrementalIndex<?> index = indexCreator.createIndex();
     index.add(
         new MapBasedInputRow(
             System.currentTimeMillis() - 1,
@@ -193,7 +135,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
   @Test
   public void controlTest() throws IndexSizeExceededException
   {
-    IncrementalIndex index = closerRule.closeLater(indexCreator.createIndex());
+    IncrementalIndex<?> index = indexCreator.createIndex();
     index.add(
         new MapBasedInputRow(
             System.currentTimeMillis() - 1,
@@ -220,7 +162,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
   @Test
   public void testUnparseableNumerics() throws IndexSizeExceededException
   {
-    IncrementalIndex<?> index = closerRule.closeLater(indexCreator.createIndex());
+    IncrementalIndex<?> index = indexCreator.createIndex();
 
     IncrementalIndexAddResult result;
     result = index.add(
@@ -286,7 +228,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
         Lists.newArrayList("billy", "joe"),
         ImmutableMap.of("billy", "A", "joe", "B")
     );
-    IncrementalIndex index = closerRule.closeLater(indexCreator.createIndex());
+    IncrementalIndex<?> index = indexCreator.createIndex();
     index.add(row);
     index.add(row);
     index.add(row);
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/OffheapIncrementalIndexTestSpec.java b/processing/src/test/java/org/apache/druid/segment/incremental/OffheapIncrementalIndexTestSpec.java
new file mode 100644
index 0000000..9259731
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/OffheapIncrementalIndexTestSpec.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Supplier;
+import org.apache.druid.collections.CloseableStupidPool;
+import org.apache.druid.utils.JvmUtils;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.nio.ByteBuffer;
+
+/**
+ * OffheapIncrementalIndexTestSpec describes the off-heap indexing method for data ingestion.
+ * It also acts as a ByteBuffer supplier for the created off-heap incremental index.
+ *
+ * Note: since the off-heap incremental index is not yet supported in production ingestion, we define its spec here
+ * only for testing purposes.
+ */
+public class OffheapIncrementalIndexTestSpec implements AppendableIndexSpec, Supplier<ByteBuffer>, Closeable
+{
+  public static final String TYPE = "offheap";
+  static final int DEFAULT_BUFFER_SIZE = 1 << 23;
+  static final int DEFAULT_CACHE_SIZE = 1 << 30;
+
+  final int bufferSize;
+  final int cacheSize;
+
+  final CloseableStupidPool<ByteBuffer> bufferPool;
+
+  @JsonCreator
+  public OffheapIncrementalIndexTestSpec(
+      final @JsonProperty("bufferSize") @Nullable Integer bufferSize,
+      final @JsonProperty("cacheSize") @Nullable Integer cacheSize
+  )
+  {
+    this.bufferSize = bufferSize != null && bufferSize > 0 ? bufferSize : DEFAULT_BUFFER_SIZE;
+    this.cacheSize = cacheSize != null && cacheSize > this.bufferSize ? cacheSize : DEFAULT_CACHE_SIZE;
+    this.bufferPool = new CloseableStupidPool<>(
+        "Off-heap incremental-index buffer pool",
+        this,
+        0,
+        this.cacheSize / this.bufferSize
+    );
+  }
+
+  @JsonProperty
+  public int getBufferSize()
+  {
+    return bufferSize;
+  }
+
+  @JsonProperty
+  public int getCacheSize()
+  {
+    return cacheSize;
+  }
+
+  @Override
+  public AppendableIndexBuilder builder()
+  {
+    return new OffheapIncrementalIndex.Builder().setBufferPool(bufferPool);
+  }
+
+  @Override
+  public long getDefaultMaxBytesInMemory()
+  {
+    // In the realtime node, the entire JVM's direct memory is utilized for ingestion and persist operations.
+    // But maxBytesInMemory only refers to the active index size and not to the index being flushed to disk and the
+    // persist buffer.
+    // To account for that, we set default to 1/2 of the max JVM's direct memory.
+    return JvmUtils.getRuntimeInfo().getDirectMemorySizeBytes() / 2;
+  }
+
+  // Supplier<ByteBuffer> and Closeable interface implementation
+
+  @Override
+  public ByteBuffer get()
+  {
+    return ByteBuffer.allocateDirect(bufferSize);
+  }
+
+  @Override
+  public void close()
+  {
+    bufferPool.close();
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java
index 5bd1a59..64da13d 100644
--- a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionSelectorsTest.java
@@ -48,6 +48,7 @@ import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
 import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.junit.Assert;
 import org.junit.Test;
@@ -360,7 +361,7 @@ public class ExpressionSelectorsTest extends InitializedNullHandlingTest
         true
     );
 
-    IncrementalIndex index = new IncrementalIndex.Builder().setMaxRowCount(100).setIndexSchema(schema).buildOnheap();
+    IncrementalIndex index = new OnheapIncrementalIndex.Builder().setMaxRowCount(100).setIndexSchema(schema).build();
     index.add(
         new MapBasedInputRow(
             DateTimes.nowUtc().getMillis(),
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java b/server/src/test/java/org/apache/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java
index 1bb58ab..10a7ec9 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java
@@ -46,6 +46,7 @@ import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.transform.TransformSpec;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
@@ -126,7 +127,7 @@ public class IngestSegmentFirehoseTest
 
     try (
         final QueryableIndex qi = indexIO.loadIndex(segmentDir);
-        final IncrementalIndex index = new IncrementalIndex.Builder()
+        final IncrementalIndex index = new OnheapIncrementalIndex.Builder()
             .setIndexSchema(
                 new IncrementalIndexSchema.Builder()
                     .withDimensionsSpec(DIMENSIONS_SPEC_REINDEX)
@@ -134,7 +135,7 @@ public class IngestSegmentFirehoseTest
                     .build()
             )
             .setMaxRowCount(5000)
-            .buildOnheap()
+            .build()
     ) {
       final StorageAdapter sa = new QueryableIndexStorageAdapter(qi);
       final WindowedStorageAdapter wsa = new WindowedStorageAdapter(sa, sa.getInterval());
@@ -216,7 +217,7 @@ public class IngestSegmentFirehoseTest
     );
 
     try (
-        final IncrementalIndex index = new IncrementalIndex.Builder()
+        final IncrementalIndex index = new OnheapIncrementalIndex.Builder()
             .setIndexSchema(
                 new IncrementalIndexSchema.Builder()
                     .withDimensionsSpec(parser.getParseSpec().getDimensionsSpec())
@@ -224,7 +225,7 @@ public class IngestSegmentFirehoseTest
                     .build()
             )
             .setMaxRowCount(5000)
-            .buildOnheap()
+            .build()
     ) {
       for (String line : rows) {
         index.add(parser.parse(line));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org