You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/12/16 20:22:16 UTC

[pinot] branch master updated: make index creator provision pluggable (#7885)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f9ab252  make index creator provision pluggable (#7885)
f9ab252 is described below

commit f9ab252980e4f973d60b9db2a0f5e7d5764bdaf2
Author: Richard Startin <ri...@startree.ai>
AuthorDate: Thu Dec 16 20:21:48 2021 +0000

    make index creator provision pluggable (#7885)
    
    This allows index creation to be intercepted, so that the current static logic in SegmentIndexCreator can be extended or overridden. This is achieved by introducing a new interface IndexCreatorProvider which provides various new index creators from an IndexCreationContext which bundles all information about index creation. External users can register a decorator which can enhance or entirely replace the default index creator provision logic. Typically, a registered decorator should pa [...]
---
 .../pinot/core/minion/RawIndexConverter.java       |  19 +-
 .../org/apache/pinot/perf/BenchmarkRangeIndex.java |  11 +-
 .../ConvertToRawIndexTaskExecutor.java             |   4 +-
 pinot-segment-local/pom.xml                        |   3 +-
 .../creator/impl/DefaultIndexCreatorProvider.java  | 274 ++++++++++++
 .../creator/impl/SegmentColumnarIndexCreator.java  | 290 +++----------
 .../impl/inv/BitSlicedRangeIndexCreator.java       |  55 ++-
 .../segment/index/loader/IndexHandlerFactory.java  |  20 +-
 .../loader/bloomfilter/BloomFilterHandler.java     |  12 +-
 .../loader/invertedindex/FSTIndexHandler.java      |  17 +-
 .../index/loader/invertedindex/H3IndexHandler.java |  12 +-
 .../loader/invertedindex/InvertedIndexHandler.java |  14 +-
 .../loader/invertedindex/JsonIndexHandler.java     |  16 +-
 .../loader/invertedindex/RangeIndexHandler.java    |  25 +-
 .../loader/invertedindex/TextIndexHandler.java     |  10 +-
 .../creator/impl/IndexCreatorOverrideTest.java     |  88 ++++
 .../index/creator/BitSlicedIndexCreatorTest.java   |  20 +-
 pinot-segment-spi/pom.xml                          |   6 +
 .../spi/creator/BloomFilterCreatorProvider.java    |  36 ++
 .../spi/creator/ForwardIndexCreatorProvider.java   |  35 ++
 .../creator/GeoSpatialIndexCreatorProvider.java    |  37 ++
 .../segment/spi/creator/IndexCreationContext.java  | 467 +++++++++++++++++++++
 .../segment/spi/creator/IndexCreatorProvider.java  |  28 ++
 .../segment/spi/creator/IndexCreatorProviders.java | 159 +++++++
 .../spi/creator/InvertedIndexCreatorProvider.java  |  36 ++
 .../spi/creator/JsonIndexCreatorProvider.java      |  36 ++
 .../spi/creator/RangeIndexCreatorProvider.java     |  36 ++
 .../spi/creator/TextIndexCreatorProvider.java      |  37 ++
 .../spi/index/metadata/ColumnMetadataImpl.java     |   4 +
 .../spi/creator/IndexCreatorProvidersTest.java     |  53 +++
 .../converter/DictionaryToRawIndexConverter.java   |   4 +-
 31 files changed, 1538 insertions(+), 326 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
index 78552dd..f6d2a7d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
@@ -26,8 +26,6 @@ import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
-import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
-import org.apache.pinot.segment.local.segment.creator.impl.SegmentColumnarIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.utils.CrcUtils;
@@ -36,12 +34,15 @@ import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.creator.ForwardIndexCreatorProvider;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.MetricFieldSpec;
@@ -82,13 +83,14 @@ public class RawIndexConverter {
   private final File _convertedIndexDir;
   private final PropertiesConfiguration _convertedProperties;
   private final String _columnsToConvert;
+  private final ForwardIndexCreatorProvider _indexCreatorProvider;
 
   /**
    * NOTE: original segment should be in V1 format.
    * TODO: support V3 format
    */
   public RawIndexConverter(String rawTableName, File originalIndexDir, File convertedIndexDir,
-      @Nullable String columnsToConvert)
+      @Nullable String columnsToConvert, ForwardIndexCreatorProvider indexCreatorProvider)
       throws Exception {
     FileUtils.copyDirectory(originalIndexDir, convertedIndexDir);
     IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
@@ -101,6 +103,7 @@ public class RawIndexConverter {
     _convertedProperties =
         new PropertiesConfiguration(new File(_convertedIndexDir, V1Constants.MetadataKeys.METADATA_FILE_NAME));
     _columnsToConvert = columnsToConvert;
+    _indexCreatorProvider = indexCreatorProvider;
   }
 
   public boolean convert()
@@ -205,11 +208,11 @@ public class RawIndexConverter {
     assert dictionary != null;
     DataType storedType = dictionary.getValueType();
     int numDocs = _originalSegmentMetadata.getTotalDocs();
-    int lengthOfLongestEntry = _originalSegmentMetadata.getColumnMetadataFor(columnName).getColumnMaxLength();
-    try (ForwardIndexCreator rawIndexCreator = SegmentColumnarIndexCreator
-        .getRawIndexCreatorForSVColumn(_convertedIndexDir, ChunkCompressionType.LZ4, columnName,
-            storedType, numDocs, lengthOfLongestEntry, false,
-            BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+    ColumnMetadata columnMetadata = _originalSegmentMetadata.getColumnMetadataFor(columnName);
+    try (ForwardIndexCreator rawIndexCreator = _indexCreatorProvider.newForwardIndexCreator(
+        IndexCreationContext.builder().withIndexDir(_convertedIndexDir).withColumnMetadata(columnMetadata)
+            .withFieldSpec(new DimensionFieldSpec(columnName, storedType, columnMetadata.isSingleValue()))
+            .withDictionary(false).build().forForwardIndex(ChunkCompressionType.LZ4, null));
         ForwardIndexReaderContext readerContext = reader.createContext()) {
       switch (storedType) {
         case INT:
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRangeIndex.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRangeIndex.java
index 60db2a3..e250eb6 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRangeIndex.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRangeIndex.java
@@ -191,14 +191,7 @@ public class BenchmarkRangeIndex {
     public void setup()
         throws IOException {
       super.setup();
-      ColumnMetadata metadata = new ColumnMetadataImpl.Builder()
-          .setFieldSpec(_fieldSpec)
-          .setTotalDocs(_numDocs)
-          .setHasDictionary(false)
-          .setMaxValue(max())
-          .setMinValue(min())
-          .build();
-      _creator = new BitSlicedRangeIndexCreator(_indexDir, metadata);
+      _creator = new BitSlicedRangeIndexCreator(_indexDir, _fieldSpec, min(), max());
     }
   }
 
@@ -328,7 +321,7 @@ public class BenchmarkRangeIndex {
 
     @Override
     protected RawValueBasedInvertedIndexCreator newCreator() {
-      return new BitSlicedRangeIndexCreator(_indexDir, metadata());
+      return new BitSlicedRangeIndexCreator(_indexDir, _fieldSpec, min(), max());
     }
   }
 
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutor.java
index 7a5a33f..ba3e610 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutor.java
@@ -27,6 +27,7 @@ import org.apache.pinot.core.minion.PinotTaskConfig;
 import org.apache.pinot.core.minion.RawIndexConverter;
 import org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor;
 import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProviders;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 
@@ -39,7 +40,8 @@ public class ConvertToRawIndexTaskExecutor extends BaseSingleSegmentConversionEx
     String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
     String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
     new RawIndexConverter(rawTableName, indexDir, workingDir,
-        configs.get(MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY)).convert();
+        configs.get(MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY),
+        IndexCreatorProviders.getIndexCreatorProvider()).convert();
     return new SegmentConversionResult.Builder().setFile(workingDir)
         .setTableNameWithType(configs.get(MinionConstants.TABLE_NAME_KEY))
         .setSegmentName(configs.get(MinionConstants.SEGMENT_NAME_KEY)).build();
diff --git a/pinot-segment-local/pom.xml b/pinot-segment-local/pom.xml
index c60da9c..e1bab72 100644
--- a/pinot-segment-local/pom.xml
+++ b/pinot-segment-local/pom.xml
@@ -122,9 +122,10 @@
       <artifactId>testng</artifactId>
       <scope>test</scope>
     </dependency>
+    <!-- required for static mock in IndexCreatorOverrideTest -->
     <dependency>
       <groupId>org.mockito</groupId>
-      <artifactId>mockito-core</artifactId>
+      <artifactId>mockito-inline</artifactId>
       <scope>test</scope>
     </dependency>
     <dependency>
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java
new file mode 100644
index 0000000..2753459
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.local.segment.creator.impl.bloom.OnHeapGuavaBloomFilterCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueFixedByteRawIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSortedForwardIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.inv.OffHeapBitmapInvertedIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.inv.OnHeapBitmapInvertedIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.inv.RangeIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.inv.geospatial.OffHeapH3IndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.inv.geospatial.OnHeapH3IndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.inv.json.OffHeapJsonIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.inv.json.OnHeapJsonIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.inv.text.LuceneFSTIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator;
+import org.apache.pinot.segment.local.utils.nativefst.NativeFSTIndexCreator;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
+import org.apache.pinot.segment.spi.index.creator.BloomFilterCreator;
+import org.apache.pinot.segment.spi.index.creator.CombinedInvertedIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.GeoSpatialIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.TextIndexCreator;
+import org.apache.pinot.segment.spi.index.reader.H3IndexResolution;
+import org.apache.pinot.spi.config.table.FSTType;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * This class centralizes logic for how to create indexes. It can be overridden
+ * by SPI {@see IndexCreatorProviders} and should not be constructed directly, but
+ * accessed only via {@see IndexCreatorProviders#getIndexCreatorProvider}. Unless
+ * a user provides an override, this is the logic which will be used to create
+ * each index type.
+ */
+public final class DefaultIndexCreatorProvider implements IndexCreatorProvider {
+
+  @Override
+  public ForwardIndexCreator newForwardIndexCreator(IndexCreationContext.Forward context)
+      throws Exception {
+    if (!context.hasDictionary()) {
+      boolean deriveNumDocsPerChunk =
+          shouldDeriveNumDocsPerChunk(context.getFieldSpec().getName(), context.getColumnProperties());
+      int writerVersion = getRawIndexWriterVersion(context.getFieldSpec().getName(), context.getColumnProperties());
+      if (context.getFieldSpec().isSingleValueField()) {
+        return getRawIndexCreatorForSVColumn(context.getIndexDir(), context.getChunkCompressionType(),
+            context.getFieldSpec().getName(), context.getFieldSpec().getDataType().getStoredType(),
+            context.getTotalDocs(), context.getLengthOfLongestEntry(), deriveNumDocsPerChunk, writerVersion);
+      } else {
+        return getRawIndexCreatorForMVColumn(context.getIndexDir(), context.getChunkCompressionType(),
+            context.getFieldSpec().getName(), context.getFieldSpec().getDataType().getStoredType(),
+            context.getTotalDocs(), context.getMaxNumberOfMultiValueElements(), deriveNumDocsPerChunk, writerVersion,
+            context.getMaxRowLengthInBytes());
+      }
+    } else {
+      if (context.getFieldSpec().isSingleValueField()) {
+        if (context.isSorted()) {
+          return new SingleValueSortedForwardIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
+              context.getCardinality());
+        } else {
+          return new SingleValueUnsortedForwardIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
+              context.getCardinality(), context.getTotalDocs());
+        }
+      } else {
+        return new MultiValueUnsortedForwardIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
+            context.getCardinality(), context.getTotalDocs(), context.getTotalNumberOfEntries());
+      }
+    }
+  }
+
+  @Override
+  public DictionaryBasedInvertedIndexCreator newInvertedIndexCreator(IndexCreationContext.Inverted context)
+      throws IOException {
+    if (context.isOnHeap()) {
+      return new OnHeapBitmapInvertedIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
+          context.getCardinality());
+    } else {
+      return new OffHeapBitmapInvertedIndexCreator(context.getIndexDir(), context.getFieldSpec(),
+          context.getCardinality(), context.getTotalDocs(), context.getTotalNumberOfEntries());
+    }
+  }
+
+  @Override
+  public JsonIndexCreator newJsonIndexCreator(IndexCreationContext.Json context)
+      throws IOException {
+    Preconditions.checkState(context.getFieldSpec().isSingleValueField(),
+        "Json index is currently only supported on single-value columns");
+    Preconditions.checkState(context.getFieldSpec().getDataType().getStoredType() == FieldSpec.DataType.STRING,
+        "Json index is currently only supported on STRING columns");
+    return context.isOnHeap() ? new OnHeapJsonIndexCreator(context.getIndexDir(), context.getFieldSpec().getName())
+        : new OffHeapJsonIndexCreator(context.getIndexDir(), context.getFieldSpec().getName());
+  }
+
+  @Override
+  public TextIndexCreator newTextIndexCreator(IndexCreationContext.Text context)
+      throws IOException {
+    if (context.isFst()) {
+      Preconditions.checkState(context.getFieldSpec().isSingleValueField(),
+          "FST index is currently only supported on single-value columns");
+      Preconditions.checkState(context.getFieldSpec().getDataType().getStoredType() == FieldSpec.DataType.STRING,
+          "FST index is currently only supported on STRING type columns");
+      Preconditions.checkState(context.hasDictionary(),
+          "FST index is currently only supported on dictionary-encoded columns");
+      String[] sortedValues = context.getSortedUniqueElementsArray();
+      if (context.getFstType() == FSTType.NATIVE) {
+        return new NativeFSTIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(), sortedValues);
+      } else {
+        return new LuceneFSTIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(), sortedValues);
+      }
+    } else {
+      Preconditions.checkState(context.getFieldSpec().getDataType().getStoredType() == FieldSpec.DataType.STRING,
+          "Text index is currently only supported on STRING type columns");
+      return new LuceneTextIndexCreator(context.getFieldSpec().getName(), context.getIndexDir(),
+          context.isCommitOnClose());
+    }
+  }
+
+  @Override
+  public GeoSpatialIndexCreator newGeoSpatialIndexCreator(IndexCreationContext.Geospatial context)
+      throws IOException {
+    Preconditions.checkState(context.getFieldSpec().isSingleValueField(),
+        "H3 index is currently only supported on single-value columns");
+    Preconditions.checkState(context.getFieldSpec().getDataType().getStoredType() == FieldSpec.DataType.BYTES,
+        "H3 index is currently only supported on BYTES columns");
+    H3IndexResolution resolution = Objects.requireNonNull(context.getH3IndexConfig()).getResolution();
+    return context.isOnHeap() ? new OnHeapH3IndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
+        resolution) : new OffHeapH3IndexCreator(context.getIndexDir(), context.getFieldSpec().getName(), resolution);
+  }
+
+  public static boolean shouldDeriveNumDocsPerChunk(String columnName,
+      Map<String, Map<String, String>> columnProperties) {
+    if (columnProperties != null) {
+      Map<String, String> properties = columnProperties.get(columnName);
+      return properties != null && Boolean.parseBoolean(
+          properties.get(FieldConfig.DERIVE_NUM_DOCS_PER_CHUNK_RAW_INDEX_KEY));
+    }
+    return false;
+  }
+
+  public static int getRawIndexWriterVersion(String columnName, Map<String, Map<String, String>> columnProperties) {
+    if (columnProperties != null && columnProperties.get(columnName) != null) {
+      Map<String, String> properties = columnProperties.get(columnName);
+      String version = properties.get(FieldConfig.RAW_INDEX_WRITER_VERSION);
+      if (version == null) {
+        return BaseChunkSVForwardIndexWriter.DEFAULT_VERSION;
+      }
+      return Integer.parseInt(version);
+    }
+    return BaseChunkSVForwardIndexWriter.DEFAULT_VERSION;
+  }
+
+  /**
+   * Helper method to build the raw index creator for the column.
+   * Assumes that column to be indexed is single valued.
+   *
+   * @param file Output index file
+   * @param column Column name
+   * @param totalDocs Total number of documents to index
+   * @param lengthOfLongestEntry Length of longest entry
+   * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive the number of rows per chunk
+   * @param writerVersion version to use for the raw index writer
+   * @return raw index creator
+   */
+  public static ForwardIndexCreator getRawIndexCreatorForSVColumn(File file, ChunkCompressionType compressionType,
+      String column, FieldSpec.DataType dataType, int totalDocs, int lengthOfLongestEntry,
+      boolean deriveNumDocsPerChunk,
+      int writerVersion)
+      throws IOException {
+    switch (dataType.getStoredType()) {
+      case INT:
+      case LONG:
+      case FLOAT:
+      case DOUBLE:
+        return new SingleValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs, dataType,
+            writerVersion);
+      case STRING:
+      case BYTES:
+        return new SingleValueVarByteRawIndexCreator(file, compressionType, column, totalDocs, dataType,
+            lengthOfLongestEntry, deriveNumDocsPerChunk, writerVersion);
+      default:
+        throw new UnsupportedOperationException("Data type not supported for raw indexing: " + dataType);
+    }
+  }
+
+  /**
+   * Helper method to build the raw index creator for the column.
+   * Assumes that column to be indexed is single valued.
+   *
+   * @param file Output index file
+   * @param column Column name
+   * @param totalDocs Total number of documents to index
+   * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive the number of rows
+   *     per chunk
+   * @param writerVersion version to use for the raw index writer
+   * @param maxRowLengthInBytes the length of the longest row in bytes
+   * @return raw index creator
+   */
+  public static ForwardIndexCreator getRawIndexCreatorForMVColumn(File file, ChunkCompressionType compressionType,
+      String column, FieldSpec.DataType dataType, final int totalDocs, int maxNumberOfMultiValueElements,
+      boolean deriveNumDocsPerChunk, int writerVersion, int maxRowLengthInBytes)
+      throws IOException {
+    switch (dataType.getStoredType()) {
+      case INT:
+      case LONG:
+      case FLOAT:
+      case DOUBLE:
+        return new MultiValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs, dataType,
+            maxNumberOfMultiValueElements, deriveNumDocsPerChunk, writerVersion);
+      case STRING:
+      case BYTES:
+        return new MultiValueVarByteRawIndexCreator(file, compressionType, column, totalDocs, dataType, writerVersion,
+            maxRowLengthInBytes, maxNumberOfMultiValueElements);
+      default:
+        throw new UnsupportedOperationException("Data type not supported for raw indexing: " + dataType);
+    }
+  }
+
+  @Override
+  public BloomFilterCreator newBloomFilterCreator(IndexCreationContext.BloomFilter context)
+      throws IOException {
+    return new OnHeapGuavaBloomFilterCreator(context.getIndexDir(), context.getFieldSpec().getName(),
+        context.getCardinality(), Objects.requireNonNull(context.getBloomFilterConfig()));
+  }
+
+  @Override
+  public CombinedInvertedIndexCreator newRangeIndexCreator(IndexCreationContext.Range context)
+      throws IOException {
+    if (context.getRangeIndexVersion() == BitSlicedRangeIndexCreator.VERSION && context.getFieldSpec()
+        .isSingleValueField()) {
+      if (context.hasDictionary()) {
+        return new BitSlicedRangeIndexCreator(context.getIndexDir(), context.getFieldSpec(), context.getCardinality());
+      }
+      return new BitSlicedRangeIndexCreator(context.getIndexDir(), context.getFieldSpec(), context.getMin(),
+          context.getMax());
+    }
+    // default to RangeIndexCreator for the time being
+    return new RangeIndexCreator(context.getIndexDir(), context.getFieldSpec(),
+        context.hasDictionary() ? FieldSpec.DataType.INT : context.getFieldSpec().getDataType(), -1,
+        -1, context.getTotalDocs(), context.getTotalNumberOfEntries());
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
index 764d36f..b01dd3f 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -33,28 +33,14 @@ import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.pinot.common.utils.FileUtils;
 import org.apache.pinot.segment.local.io.util.PinotDataBitSet;
-import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
-import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueFixedByteRawIndexCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSortedForwardIndexCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.inv.OffHeapBitmapInvertedIndexCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.inv.OnHeapBitmapInvertedIndexCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.inv.geospatial.OffHeapH3IndexCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.inv.geospatial.OnHeapH3IndexCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.inv.json.OffHeapJsonIndexCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.inv.json.OnHeapJsonIndexCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.inv.text.LuceneFSTIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.nullvalue.NullValueVectorCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator;
 import org.apache.pinot.segment.local.utils.GeometrySerializer;
-import org.apache.pinot.segment.local.utils.nativefst.NativeFSTIndexCreator;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.creator.ColumnIndexCreationInfo;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProviders;
 import org.apache.pinot.segment.spi.creator.SegmentCreator;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator;
@@ -64,9 +50,7 @@ import org.apache.pinot.segment.spi.index.creator.H3IndexConfig;
 import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
 import org.apache.pinot.segment.spi.index.creator.SegmentIndexCreationInfo;
 import org.apache.pinot.segment.spi.index.creator.TextIndexCreator;
-import org.apache.pinot.segment.spi.index.reader.H3IndexResolution;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
-import org.apache.pinot.spi.config.table.FSTType;
 import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.DateTimeFormatSpec;
@@ -99,6 +83,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
 
   private SegmentGeneratorConfig _config;
   private Map<String, ColumnIndexCreationInfo> _indexCreationInfoMap;
+  private final IndexCreatorProvider _indexCreatorProvider = IndexCreatorProviders.getIndexCreatorProvider();
   private final Map<String, SegmentDictionaryCreator> _dictionaryCreatorMap = new HashMap<>();
   private final Map<String, ForwardIndexCreator> _forwardIndexCreatorMap = new HashMap<>();
   private final Map<String, DictionaryBasedInvertedIndexCreator> _invertedIndexCreatorMap = new HashMap<>();
@@ -179,126 +164,70 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
       }
 
       String columnName = fieldSpec.getName();
-      DataType storedType = fieldSpec.getDataType().getStoredType();
-      ColumnIndexCreationInfo indexCreationInfo = indexCreationInfoMap.get(columnName);
-      Preconditions.checkNotNull(indexCreationInfo, "Missing index creation info for column: %s", columnName);
-      boolean dictEnabledColumn = createDictionaryForColumn(indexCreationInfo, segmentCreationSpec, fieldSpec);
-
+      ColumnIndexCreationInfo columnIndexCreationInfo = indexCreationInfoMap.get(columnName);
+      Preconditions.checkNotNull(columnIndexCreationInfo, "Missing index creation info for column: %s", columnName);
+      boolean dictEnabledColumn = createDictionaryForColumn(columnIndexCreationInfo, segmentCreationSpec, fieldSpec);
+      Preconditions.checkState(dictEnabledColumn || !invertedIndexColumns.contains(columnName),
+          "Cannot create inverted index for raw index column: %s", columnName);
+
+      IndexCreationContext.Common context = IndexCreationContext.builder()
+          .withIndexDir(_indexDir)
+          .withCardinality(columnIndexCreationInfo.getDistinctValueCount())
+          .withDictionary(dictEnabledColumn)
+          .withFieldSpec(fieldSpec)
+          .withTotalDocs(segmentIndexCreationInfo.getTotalDocs())
+          .withTotalNumberOfEntries(columnIndexCreationInfo.getTotalNumberOfEntries())
+          .withColumnIndexCreationInfo(columnIndexCreationInfo)
+          .sorted(columnIndexCreationInfo.isSorted())
+          .onHeap(segmentCreationSpec.isOnHeap())
+          .build();
+      // Initialize forward index creator
+      ChunkCompressionType chunkCompressionType =
+          dictEnabledColumn ? null : getColumnCompressionType(segmentCreationSpec, fieldSpec);
+      _forwardIndexCreatorMap.put(columnName, _indexCreatorProvider.newForwardIndexCreator(
+          context.forForwardIndex(chunkCompressionType, segmentCreationSpec.getColumnProperties())));
+
+      // Initialize inverted index creator; skip creating inverted index if sorted
+      if (invertedIndexColumns.contains(columnName) && !columnIndexCreationInfo.isSorted()) {
+        _invertedIndexCreatorMap.put(columnName,
+            _indexCreatorProvider.newInvertedIndexCreator(context.forInvertedIndex()));
+      }
       if (dictEnabledColumn) {
         // Create dictionary-encoded index
-
         // Initialize dictionary creator
         SegmentDictionaryCreator dictionaryCreator =
-            new SegmentDictionaryCreator(indexCreationInfo.getSortedUniqueElementsArray(), fieldSpec, _indexDir,
-                indexCreationInfo.isUseVarLengthDictionary());
+            new SegmentDictionaryCreator(columnIndexCreationInfo.getSortedUniqueElementsArray(), fieldSpec, _indexDir,
+                columnIndexCreationInfo.isUseVarLengthDictionary());
         _dictionaryCreatorMap.put(columnName, dictionaryCreator);
-
         // Create dictionary
         try {
           dictionaryCreator.build();
         } catch (Exception e) {
           LOGGER.error("Error building dictionary for field: {}, cardinality: {}, number of bytes per entry: {}",
-              fieldSpec.getName(), indexCreationInfo.getDistinctValueCount(), dictionaryCreator.getNumBytesPerEntry());
+              fieldSpec.getName(), columnIndexCreationInfo.getDistinctValueCount(),
+              dictionaryCreator.getNumBytesPerEntry());
           throw e;
         }
-
-        // Initialize forward index creator
-        int cardinality = indexCreationInfo.getDistinctValueCount();
-        if (fieldSpec.isSingleValueField()) {
-          if (indexCreationInfo.isSorted()) {
-            _forwardIndexCreatorMap.put(columnName,
-                new SingleValueSortedForwardIndexCreator(_indexDir, columnName, cardinality));
-          } else {
-            _forwardIndexCreatorMap.put(columnName,
-                new SingleValueUnsortedForwardIndexCreator(_indexDir, columnName, cardinality, _totalDocs));
-          }
-        } else {
-          _forwardIndexCreatorMap.put(columnName,
-              new MultiValueUnsortedForwardIndexCreator(_indexDir, columnName, cardinality, _totalDocs,
-                  indexCreationInfo.getTotalNumberOfEntries()));
-        }
-
-        // Initialize inverted index creator; skip creating inverted index if sorted
-        if (invertedIndexColumns.contains(columnName) && !indexCreationInfo.isSorted()) {
-          if (segmentCreationSpec.isOnHeap()) {
-            _invertedIndexCreatorMap.put(columnName,
-                new OnHeapBitmapInvertedIndexCreator(_indexDir, columnName, cardinality));
-          } else {
-            _invertedIndexCreatorMap.put(columnName,
-                new OffHeapBitmapInvertedIndexCreator(_indexDir, fieldSpec, cardinality, _totalDocs,
-                    indexCreationInfo.getTotalNumberOfEntries()));
-          }
-        }
-      } else {
-        // Create raw index
-        Preconditions.checkState(!invertedIndexColumns.contains(columnName),
-            "Cannot create inverted index for raw index column: %s", columnName);
-
-        ChunkCompressionType compressionType = getColumnCompressionType(segmentCreationSpec, fieldSpec);
-
-        // Initialize forward index creator
-        boolean deriveNumDocsPerChunk =
-            shouldDeriveNumDocsPerChunk(columnName, segmentCreationSpec.getColumnProperties());
-        int writerVersion = rawIndexWriterVersion(columnName, segmentCreationSpec.getColumnProperties());
-        if (fieldSpec.isSingleValueField()) {
-          _forwardIndexCreatorMap.put(columnName,
-              getRawIndexCreatorForSVColumn(_indexDir, compressionType, columnName, storedType, _totalDocs,
-                  indexCreationInfo.getLengthOfLongestEntry(), deriveNumDocsPerChunk, writerVersion));
-        } else {
-          _forwardIndexCreatorMap.put(columnName,
-              getRawIndexCreatorForMVColumn(_indexDir, compressionType, columnName, storedType, _totalDocs,
-                  indexCreationInfo.getMaxNumberOfMultiValueElements(), deriveNumDocsPerChunk, writerVersion,
-                  indexCreationInfo.getMaxRowLengthInBytes()));
-        }
       }
 
       if (textIndexColumns.contains(columnName)) {
-        // Initialize text index creator
-        Preconditions.checkState(storedType == DataType.STRING,
-            "Text index is currently only supported on STRING type columns");
-        _textIndexCreatorMap.put(columnName,
-            new LuceneTextIndexCreator(columnName, _indexDir, true /* commitOnClose */));
+        _textIndexCreatorMap.put(columnName, _indexCreatorProvider.newTextIndexCreator(context.forTextIndex(true)));
       }
 
       if (fstIndexColumns.contains(columnName)) {
-        Preconditions.checkState(fieldSpec.isSingleValueField(),
-            "FST index is currently only supported on single-value columns");
-        Preconditions.checkState(storedType == DataType.STRING,
-            "FST index is currently only supported on STRING type columns");
-        Preconditions.checkState(dictEnabledColumn,
-            "FST index is currently only supported on dictionary-encoded columns");
-        String[] sortedValues = (String[]) indexCreationInfo.getSortedUniqueElementsArray();
-        TextIndexCreator textIndexCreator;
-        if (_config.getFSTIndexType() == FSTType.NATIVE) {
-          textIndexCreator = new NativeFSTIndexCreator(_indexDir, columnName, sortedValues);
-        } else {
-          textIndexCreator = new LuceneFSTIndexCreator(_indexDir, columnName, sortedValues);
-        }
-
-        _fstIndexCreatorMap.put(columnName, textIndexCreator);
+        _fstIndexCreatorMap.put(columnName, _indexCreatorProvider.newTextIndexCreator(
+            context.forFSTIndex(_config.getFSTIndexType(),
+                (String[]) columnIndexCreationInfo.getSortedUniqueElementsArray())));
       }
 
       if (jsonIndexColumns.contains(columnName)) {
-        Preconditions.checkState(fieldSpec.isSingleValueField(),
-            "Json index is currently only supported on single-value columns");
-        Preconditions.checkState(storedType == DataType.STRING,
-            "Json index is currently only supported on STRING columns");
-        JsonIndexCreator jsonIndexCreator =
-            segmentCreationSpec.isOnHeap() ? new OnHeapJsonIndexCreator(_indexDir, columnName)
-                : new OffHeapJsonIndexCreator(_indexDir, columnName);
-        _jsonIndexCreatorMap.put(columnName, jsonIndexCreator);
+        _jsonIndexCreatorMap.put(columnName, _indexCreatorProvider.newJsonIndexCreator(context.forJsonIndex()));
       }
 
       H3IndexConfig h3IndexConfig = h3IndexConfigs.get(columnName);
       if (h3IndexConfig != null) {
-        Preconditions.checkState(fieldSpec.isSingleValueField(),
-            "H3 index is currently only supported on single-value columns");
-        Preconditions.checkState(storedType == DataType.BYTES, "H3 index is currently only supported on BYTES columns");
-        H3IndexResolution resolution = h3IndexConfig.getResolution();
-        GeoSpatialIndexCreator h3IndexCreator =
-            segmentCreationSpec.isOnHeap() ? new OnHeapH3IndexCreator(_indexDir, columnName, resolution)
-                : new OffHeapH3IndexCreator(_indexDir, columnName, resolution);
-        _h3IndexCreatorMap.put(columnName, h3IndexCreator);
+        _h3IndexCreatorMap.put(columnName,
+            _indexCreatorProvider.newGeoSpatialIndexCreator(context.forGeospatialIndex(h3IndexConfig)));
       }
 
       _nullHandlingEnabled = _config.isNullHandlingEnabled();
@@ -309,26 +238,29 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
     }
   }
 
-  public static boolean shouldDeriveNumDocsPerChunk(String columnName,
-      Map<String, Map<String, String>> columnProperties) {
-    if (columnProperties != null) {
-      Map<String, String> properties = columnProperties.get(columnName);
-      return properties != null && Boolean.parseBoolean(
-          properties.get(FieldConfig.DERIVE_NUM_DOCS_PER_CHUNK_RAW_INDEX_KEY));
-    }
-    return false;
-  }
-
-  public static int rawIndexWriterVersion(String columnName, Map<String, Map<String, String>> columnProperties) {
-    if (columnProperties != null && columnProperties.get(columnName) != null) {
-      Map<String, String> properties = columnProperties.get(columnName);
-      String version = properties.get(FieldConfig.RAW_INDEX_WRITER_VERSION);
-      if (version == null) {
-        return BaseChunkSVForwardIndexWriter.DEFAULT_VERSION;
-      }
-      return Integer.parseInt(version);
+  /**
+   * Returns true if dictionary should be created for a column, false otherwise.
+   * Currently there are two sources for this config:
+   * <ul>
+   *   <li> ColumnIndexCreationInfo (this is currently hard-coded to always return dictionary). </li>
+   *   <li> SegmentGeneratorConfig</li>
+   * </ul>
+   *
+   * This method gives preference to the SegmentGeneratorConfig first.
+   *
+   * @param info Column index creation info
+   * @param config Segment generation config
+   * @param spec Field spec for the column
+   * @return True if dictionary should be created for the column, false otherwise
+   */
+  private boolean createDictionaryForColumn(ColumnIndexCreationInfo info, SegmentGeneratorConfig config,
+      FieldSpec spec) {
+    String column = spec.getName();
+    if (config.getRawIndexCreationColumns().contains(column) || config.getRawIndexCompressionType()
+        .containsKey(column)) {
+      return false;
     }
-    return BaseChunkSVForwardIndexWriter.DEFAULT_VERSION;
+    return info.isCreateDictionary();
   }
 
   /**
@@ -347,7 +279,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
       FieldSpec fieldSpec) {
     ChunkCompressionType compressionType = segmentCreationSpec.getRawIndexCompressionType().get(fieldSpec.getName());
     if (compressionType == null) {
-      if (fieldSpec.getFieldType() == FieldType.METRIC) {
+      if (fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC) {
         return ChunkCompressionType.PASS_THROUGH;
       } else {
         return ChunkCompressionType.LZ4;
@@ -357,31 +289,6 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
     }
   }
 
-  /**
-   * Returns true if dictionary should be created for a column, false otherwise.
-   * Currently there are two sources for this config:
-   * <ul>
-   *   <li> ColumnIndexCreationInfo (this is currently hard-coded to always return dictionary). </li>
-   *   <li> SegmentGeneratorConfig</li>
-   * </ul>
-   *
-   * This method gives preference to the SegmentGeneratorConfig first.
-   *
-   * @param info Column index creation info
-   * @param config Segment generation config
-   * @param spec Field spec for the column
-   * @return True if dictionary should be created for the column, false otherwise
-   */
-  private boolean createDictionaryForColumn(ColumnIndexCreationInfo info, SegmentGeneratorConfig config,
-      FieldSpec spec) {
-    String column = spec.getName();
-    if (config.getRawIndexCreationColumns().contains(column) || config.getRawIndexCompressionType()
-        .containsKey(column)) {
-      return false;
-    }
-    return info.isCreateDictionary();
-  }
-
   @Override
   public void indexRow(GenericRow row)
       throws IOException {
@@ -795,71 +702,6 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
     properties.subset(COLUMN_PROPS_KEY_PREFIX + column).clear();
   }
 
-  /**
-   * Helper method to build the raw index creator for the column.
-   * Assumes that column to be indexed is single valued.
-   *
-   * @param file Output index file
-   * @param column Column name
-   * @param totalDocs Total number of documents to index
-   * @param lengthOfLongestEntry Length of longest entry
-   * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive the number of rows per chunk
-   * @param writerVersion version to use for the raw index writer
-   * @return raw index creator
-   */
-  public static ForwardIndexCreator getRawIndexCreatorForSVColumn(File file, ChunkCompressionType compressionType,
-      String column, DataType dataType, int totalDocs, int lengthOfLongestEntry, boolean deriveNumDocsPerChunk,
-      int writerVersion)
-      throws IOException {
-    switch (dataType.getStoredType()) {
-      case INT:
-      case LONG:
-      case FLOAT:
-      case DOUBLE:
-        return new SingleValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs, dataType,
-            writerVersion);
-      case STRING:
-      case BYTES:
-        return new SingleValueVarByteRawIndexCreator(file, compressionType, column, totalDocs, dataType,
-            lengthOfLongestEntry, deriveNumDocsPerChunk, writerVersion);
-      default:
-        throw new UnsupportedOperationException("Data type not supported for raw indexing: " + dataType);
-    }
-  }
-
-  /**
-   * Helper method to build the raw index creator for the column.
-   * Assumes that column to be indexed is single valued.
-   *
-   * @param file Output index file
-   * @param column Column name
-   * @param totalDocs Total number of documents to index
-   * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive the number of rows
-   *     per chunk
-   * @param writerVersion version to use for the raw index writer
-   * @param maxRowLengthInBytes the length of the longest row in bytes
-   * @return raw index creator
-   */
-  public static ForwardIndexCreator getRawIndexCreatorForMVColumn(File file, ChunkCompressionType compressionType,
-      String column, DataType dataType, final int totalDocs, int maxNumberOfMultiValueElements,
-      boolean deriveNumDocsPerChunk, int writerVersion, int maxRowLengthInBytes)
-      throws IOException {
-    switch (dataType.getStoredType()) {
-      case INT:
-      case LONG:
-      case FLOAT:
-      case DOUBLE:
-        return new MultiValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs, dataType,
-            maxNumberOfMultiValueElements, deriveNumDocsPerChunk, writerVersion);
-      case STRING:
-      case BYTES:
-        return new MultiValueVarByteRawIndexCreator(file, compressionType, column, totalDocs, dataType, writerVersion,
-            maxRowLengthInBytes, maxNumberOfMultiValueElements);
-      default:
-        throw new UnsupportedOperationException("Data type not supported for raw indexing: " + dataType);
-    }
-  }
-
   @Override
   public void close()
       throws IOException {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/BitSlicedRangeIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/BitSlicedRangeIndexCreator.java
index 4c0b98b..0f96bf4 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/BitSlicedRangeIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/BitSlicedRangeIndexCreator.java
@@ -18,10 +18,10 @@
  */
 package org.apache.pinot.segment.local.segment.creator.impl.inv;
 
+import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.IOException;
 import org.apache.pinot.segment.local.utils.FPOrdering;
-import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.index.creator.CombinedInvertedIndexCreator;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.roaringbitmap.RangeBitmap;
@@ -41,13 +41,33 @@ public class BitSlicedRangeIndexCreator implements CombinedInvertedIndexCreator
   private final File _rangeIndexFile;
   private final long _minValue;
 
-  public BitSlicedRangeIndexCreator(File indexDir, ColumnMetadata metadata) {
-    if (!metadata.isSingleValue()) {
-      throw new IllegalArgumentException("MV columns not supported");
-    }
-    _appender = RangeBitmap.appender(maxValue(metadata));
-    _rangeIndexFile = new File(indexDir, metadata.getColumnName() + BITMAP_RANGE_INDEX_FILE_EXTENSION);
-    _minValue = minValue(metadata);
+  private BitSlicedRangeIndexCreator(File indexDir, FieldSpec fieldSpec, long minValue, long maxValue) {
+    Preconditions.checkArgument(fieldSpec.isSingleValueField(), "MV columns not supported");
+    _rangeIndexFile = new File(indexDir, fieldSpec.getName() + BITMAP_RANGE_INDEX_FILE_EXTENSION);
+    _appender = RangeBitmap.appender(maxValue);
+    _minValue = minValue;
+  }
+
+  /**
+   * For dictionarized columns
+   * @param indexDir the directory for the index
+   * @param fieldSpec the specification of the field
+   * @param cardinality the cardinality of the dictionary
+   */
+  public BitSlicedRangeIndexCreator(File indexDir, FieldSpec fieldSpec, int cardinality) {
+    this(indexDir, fieldSpec, 0, cardinality - 1);
+  }
+
+  /**
+   * For raw columns
+   * @param indexDir the directory for the index
+   * @param fieldSpec the specification of the field
+   * @param minValue the minimum value
+   * @param maxValue the maximum value
+   */
+  public BitSlicedRangeIndexCreator(File indexDir, FieldSpec fieldSpec, Comparable<?> minValue,
+      Comparable<?> maxValue) {
+    this(indexDir, fieldSpec, minValue(fieldSpec, minValue), maxValue(fieldSpec, minValue, maxValue));
   }
 
   @Override
@@ -110,13 +130,8 @@ public class BitSlicedRangeIndexCreator implements CombinedInvertedIndexCreator
       throws IOException {
   }
 
-  private static long maxValue(ColumnMetadata metadata) {
-    if (metadata.hasDictionary()) {
-      return metadata.getCardinality() - 1;
-    }
-    FieldSpec.DataType storedType = metadata.getDataType().getStoredType();
-    Comparable<?> minValue = metadata.getMinValue();
-    Comparable<?> maxValue = metadata.getMaxValue();
+  private static long maxValue(FieldSpec fieldSpec, Comparable<?> minValue, Comparable<?> maxValue) {
+    FieldSpec.DataType storedType = fieldSpec.getDataType().getStoredType();
     if (storedType == INT || storedType == LONG) {
       return ((Number) maxValue).longValue() - ((Number) minValue).longValue();
     }
@@ -126,15 +141,11 @@ public class BitSlicedRangeIndexCreator implements CombinedInvertedIndexCreator
     if (storedType == DOUBLE) {
       return 0xFFFFFFFFFFFFFFFFL;
     }
-    throw new IllegalArgumentException("Unsupported data type: " + metadata.getDataType());
+    throw new IllegalArgumentException("Unsupported data type: " + fieldSpec.getDataType());
   }
 
-  private static long minValue(ColumnMetadata metadata) {
-    if (metadata.hasDictionary()) {
-      return 0;
-    }
-    FieldSpec.DataType storedType = metadata.getDataType().getStoredType();
-    Comparable<?> minValue = metadata.getMinValue();
+  private static long minValue(FieldSpec fieldSpec, Comparable<?> minValue) {
+    FieldSpec.DataType storedType = fieldSpec.getDataType().getStoredType();
     if (storedType == INT || storedType == LONG) {
       return ((Number) minValue).longValue();
     }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java
index 9dd9ecb..a6fe925 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java
@@ -26,6 +26,8 @@ import org.apache.pinot.segment.local.segment.index.loader.invertedindex.Inverte
 import org.apache.pinot.segment.local.segment.index.loader.invertedindex.JsonIndexHandler;
 import org.apache.pinot.segment.local.segment.index.loader.invertedindex.RangeIndexHandler;
 import org.apache.pinot.segment.local.segment.index.loader.invertedindex.TextIndexHandler;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProviders;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.store.ColumnIndexType;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
@@ -40,22 +42,26 @@ public class IndexHandlerFactory {
 
   public static IndexHandler getIndexHandler(ColumnIndexType type, File indexDir, SegmentMetadataImpl segmentMetadata,
       IndexLoadingConfig indexLoadingConfig, SegmentDirectory.Writer segmentWriter) {
+    IndexCreatorProvider indexCreatorProvider = IndexCreatorProviders.getIndexCreatorProvider();
     switch (type) {
       case INVERTED_INDEX:
-        return new InvertedIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter);
+        return new InvertedIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter,
+            indexCreatorProvider);
       case RANGE_INDEX:
-        return new RangeIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter);
+        return new RangeIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter,
+            indexCreatorProvider);
       case TEXT_INDEX:
-        return new TextIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter);
+        return new TextIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter, indexCreatorProvider);
       case FST_INDEX:
         return new FSTIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter,
-            indexLoadingConfig.getFSTIndexType());
+            indexLoadingConfig.getFSTIndexType(), indexCreatorProvider);
       case JSON_INDEX:
-        return new JsonIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter);
+        return new JsonIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter, indexCreatorProvider);
       case H3_INDEX:
-        return new H3IndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter);
+        return new H3IndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter, indexCreatorProvider);
       case BLOOM_FILTER:
-        return new BloomFilterHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter);
+        return new BloomFilterHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter,
+            indexCreatorProvider);
       default:
         return NO_OP_HANDLER;
     }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java
index b8a3264..f992344 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java
@@ -24,7 +24,6 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.segment.local.segment.creator.impl.bloom.OnHeapGuavaBloomFilterCreator;
 import org.apache.pinot.segment.local.segment.index.loader.IndexHandler;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
@@ -37,6 +36,8 @@ import org.apache.pinot.segment.local.segment.index.readers.LongDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.StringDictionary;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.BloomFilterCreatorProvider;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.creator.BloomFilterCreator;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
@@ -57,13 +58,15 @@ public class BloomFilterHandler implements IndexHandler {
   private final SegmentMetadataImpl _segmentMetadata;
   private final SegmentDirectory.Writer _segmentWriter;
   private final Map<String, BloomFilterConfig> _bloomFilterConfigs;
+  private final BloomFilterCreatorProvider _indexCreatorProvider;
 
   public BloomFilterHandler(File indexDir, SegmentMetadataImpl segmentMetadata, IndexLoadingConfig indexLoadingConfig,
-      SegmentDirectory.Writer segmentWriter) {
+      SegmentDirectory.Writer segmentWriter, BloomFilterCreatorProvider indexCreatorProvider) {
     _indexDir = indexDir;
     _segmentWriter = segmentWriter;
     _segmentMetadata = segmentMetadata;
     _bloomFilterConfigs = indexLoadingConfig.getBloomFilterConfigs();
+    _indexCreatorProvider = indexCreatorProvider;
   }
 
   @Override
@@ -112,8 +115,9 @@ public class BloomFilterHandler implements IndexHandler {
     BloomFilterConfig bloomFilterConfig = _bloomFilterConfigs.get(columnName);
     LOGGER.info("Creating new bloom filter for segment: {}, column: {} with config: {}", segmentName, columnName,
         bloomFilterConfig);
-    try (BloomFilterCreator bloomFilterCreator = new OnHeapGuavaBloomFilterCreator(_indexDir, columnName,
-        columnMetadata.getCardinality(), bloomFilterConfig);
+    try (BloomFilterCreator bloomFilterCreator = _indexCreatorProvider.newBloomFilterCreator(
+        IndexCreationContext.builder().withIndexDir(_indexDir).withColumnMetadata(columnMetadata)
+            .build().forBloomFilter(bloomFilterConfig));
         Dictionary dictionary = getDictionaryReader(columnMetadata, _segmentWriter)) {
       int length = dictionary.length();
       for (int i = 0; i < length; i++) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/FSTIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/FSTIndexHandler.java
index 50f2b67..292bf0c 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/FSTIndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/FSTIndexHandler.java
@@ -24,15 +24,15 @@ import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.segment.local.segment.creator.impl.inv.text.LuceneFSTIndexCreator;
 import org.apache.pinot.segment.local.segment.index.loader.IndexHandler;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
 import org.apache.pinot.segment.local.segment.index.loader.SegmentPreProcessor;
-import org.apache.pinot.segment.local.utils.nativefst.NativeFSTIndexCreator;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
+import org.apache.pinot.segment.spi.creator.TextIndexCreatorProvider;
 import org.apache.pinot.segment.spi.index.creator.TextIndexCreator;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.store.ColumnIndexType;
@@ -71,14 +71,16 @@ public class FSTIndexHandler implements IndexHandler {
   private final SegmentDirectory.Writer _segmentWriter;
   private final Set<String> _columnsToAddIdx;
   private final FSTType _fstType;
+  private final TextIndexCreatorProvider _indexCreatorProvider;
 
   public FSTIndexHandler(File indexDir, SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig,
-      SegmentDirectory.Writer segmentWriter, FSTType fstType) {
+      SegmentDirectory.Writer segmentWriter, FSTType fstType, TextIndexCreatorProvider indexCreatorProvider) {
     _indexDir = indexDir;
     _segmentMetadata = segmentMetadata;
     _segmentWriter = segmentWriter;
     _columnsToAddIdx = new HashSet<>(indexLoadingConfig.getFSTIndexColumns());
     _fstType = fstType;
+    _indexCreatorProvider = indexCreatorProvider;
   }
 
   @Override
@@ -136,12 +138,9 @@ public class FSTIndexHandler implements IndexHandler {
     LOGGER.info("Creating new FST index for column: {} in segment: {}, cardinality: {}", column, segmentName,
         columnMetadata.getCardinality());
 
-    TextIndexCreator fstIndexCreator;
-    if (_fstType == FSTType.LUCENE) {
-      fstIndexCreator = new LuceneFSTIndexCreator(_indexDir, column, null);
-    } else {
-      fstIndexCreator = new NativeFSTIndexCreator(_indexDir, column, null);
-    }
+    TextIndexCreator fstIndexCreator = _indexCreatorProvider.newTextIndexCreator(
+        IndexCreationContext.builder().withIndexDir(_indexDir).withColumnMetadata(columnMetadata)
+            .build().forFSTIndex(_fstType, null));
 
     try (Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter, columnMetadata)) {
       for (int dictId = 0; dictId < dictionary.length(); dictId++) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java
index ef96114..d2fceaa 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java
@@ -32,7 +32,10 @@ import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
 import org.apache.pinot.segment.local.utils.GeometrySerializer;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
+import org.apache.pinot.segment.spi.index.creator.GeoSpatialIndexCreator;
 import org.apache.pinot.segment.spi.index.creator.H3IndexConfig;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
@@ -53,13 +56,15 @@ public class H3IndexHandler implements IndexHandler {
   private final SegmentMetadataImpl _segmentMetadata;
   private final SegmentDirectory.Writer _segmentWriter;
   private final Map<String, H3IndexConfig> _h3Configs;
+  private final IndexCreatorProvider _indexCreatorProvider;
 
   public H3IndexHandler(File indexDir, SegmentMetadataImpl segmentMetadata, IndexLoadingConfig indexLoadingConfig,
-      SegmentDirectory.Writer segmentWriter) {
+      SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider) {
     _indexDir = indexDir;
     _segmentMetadata = segmentMetadata;
     _segmentWriter = segmentWriter;
     _h3Configs = indexLoadingConfig.getH3IndexConfigs();
+    _indexCreatorProvider = indexCreatorProvider;
   }
 
   @Override
@@ -129,8 +134,9 @@ public class H3IndexHandler implements IndexHandler {
     try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
         ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
         Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter, columnMetadata);
-        OffHeapH3IndexCreator h3IndexCreator = new OffHeapH3IndexCreator(_indexDir, columnName,
-            _h3Configs.get(columnName).getResolution())) {
+        GeoSpatialIndexCreator h3IndexCreator = _indexCreatorProvider.newGeoSpatialIndexCreator(
+            IndexCreationContext.builder().withIndexDir(_indexDir).withColumnMetadata(columnMetadata)
+                .build().forGeospatialIndex(_h3Configs.get(columnName)))) {
       int numDocs = columnMetadata.getTotalDocs();
       for (int i = 0; i < numDocs; i++) {
         int dictId = forwardIndexReader.getDictId(i, readerContext);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java
index ee9d78f..693ca32 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java
@@ -23,14 +23,16 @@ import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.segment.local.segment.creator.impl.inv.OffHeapBitmapInvertedIndexCreator;
 import org.apache.pinot.segment.local.segment.index.loader.IndexHandler;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.InvertedIndexCreatorProvider;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
+import org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
 import org.apache.pinot.segment.spi.store.ColumnIndexType;
@@ -47,13 +49,15 @@ public class InvertedIndexHandler implements IndexHandler {
   private final SegmentMetadata _segmentMetadata;
   private final SegmentDirectory.Writer _segmentWriter;
   private final HashSet<String> _columnsToAddIdx;
+  private final InvertedIndexCreatorProvider _indexCreatorProvider;
 
   public InvertedIndexHandler(File indexDir, SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig,
-      SegmentDirectory.Writer segmentWriter) {
+      SegmentDirectory.Writer segmentWriter, InvertedIndexCreatorProvider indexCreatorProvider) {
     _indexDir = indexDir;
     _segmentMetadata = segmentMetadata;
     _segmentWriter = segmentWriter;
     _columnsToAddIdx = new HashSet<>(indexLoadingConfig.getInvertedIndexColumns());
+    _indexCreatorProvider = indexCreatorProvider;
   }
 
   @Override
@@ -100,9 +104,9 @@ public class InvertedIndexHandler implements IndexHandler {
     // Create new inverted index for the column.
     LOGGER.info("Creating new inverted index for segment: {}, column: {}", segmentName, column);
     int numDocs = columnMetadata.getTotalDocs();
-    try (OffHeapBitmapInvertedIndexCreator creator = new OffHeapBitmapInvertedIndexCreator(_indexDir,
-        columnMetadata.getFieldSpec(), columnMetadata.getCardinality(), numDocs,
-        columnMetadata.getTotalNumberOfEntries())) {
+    try (DictionaryBasedInvertedIndexCreator creator = _indexCreatorProvider.newInvertedIndexCreator(
+        IndexCreationContext.builder().withIndexDir(_indexDir).withColumnMetadata(columnMetadata).build()
+            .forInvertedIndex())) {
       try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
           ForwardIndexReaderContext readerContext = forwardIndexReader.createContext()) {
         if (columnMetadata.isSingleValue()) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java
index 93d52e8..27b8ade 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java
@@ -24,14 +24,16 @@ import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.segment.local.segment.creator.impl.inv.json.OffHeapJsonIndexCreator;
 import org.apache.pinot.segment.local.segment.index.loader.IndexHandler;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.JsonIndexCreatorProvider;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
+import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
@@ -50,13 +52,15 @@ public class JsonIndexHandler implements IndexHandler {
   private final SegmentMetadata _segmentMetadata;
   private final SegmentDirectory.Writer _segmentWriter;
   private final HashSet<String> _columnsToAddIdx;
+  private final JsonIndexCreatorProvider _indexCreatorProvider;
 
   public JsonIndexHandler(File indexDir, SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig,
-      SegmentDirectory.Writer segmentWriter) {
+      SegmentDirectory.Writer segmentWriter, JsonIndexCreatorProvider indexCreatorProvider) {
     _indexDir = indexDir;
     _segmentMetadata = segmentMetadata;
     _segmentWriter = segmentWriter;
     _columnsToAddIdx = new HashSet<>(indexLoadingConfig.getJsonIndexColumns());
+    _indexCreatorProvider = indexCreatorProvider;
   }
 
   @Override
@@ -122,11 +126,11 @@ public class JsonIndexHandler implements IndexHandler {
 
   private void handleDictionaryBasedColumn(ColumnMetadata columnMetadata)
       throws IOException {
-    String columnName = columnMetadata.getColumnName();
     try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
         ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
         Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter, columnMetadata);
-        OffHeapJsonIndexCreator jsonIndexCreator = new OffHeapJsonIndexCreator(_indexDir, columnName)) {
+        JsonIndexCreator jsonIndexCreator = _indexCreatorProvider.newJsonIndexCreator(IndexCreationContext.builder()
+            .withIndexDir(_indexDir).withColumnMetadata(columnMetadata).build().forJsonIndex())) {
       int numDocs = columnMetadata.getTotalDocs();
       for (int i = 0; i < numDocs; i++) {
         int dictId = forwardIndexReader.getDictId(i, readerContext);
@@ -138,10 +142,10 @@ public class JsonIndexHandler implements IndexHandler {
 
   private void handleNonDictionaryBasedColumn(ColumnMetadata columnMetadata)
       throws IOException {
-    String columnName = columnMetadata.getColumnName();
     try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
         ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
-        OffHeapJsonIndexCreator jsonIndexCreator = new OffHeapJsonIndexCreator(_indexDir, columnName)) {
+        JsonIndexCreator jsonIndexCreator = _indexCreatorProvider.newJsonIndexCreator(IndexCreationContext.builder()
+            .withIndexDir(_indexDir).withColumnMetadata(columnMetadata).build().forJsonIndex())) {
       int numDocs = columnMetadata.getTotalDocs();
       for (int i = 0; i < numDocs; i++) {
         jsonIndexCreator.add(forwardIndexReader.getString(i, readerContext));
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java
index 55c964b..3d5551d 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java
@@ -23,21 +23,20 @@ import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.inv.RangeIndexCreator;
 import org.apache.pinot.segment.local.segment.index.loader.IndexHandler;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.RangeIndexCreatorProvider;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.creator.CombinedInvertedIndexCreator;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
 import org.apache.pinot.segment.spi.store.ColumnIndexType;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
-import org.apache.pinot.spi.data.FieldSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,14 +50,16 @@ public class RangeIndexHandler implements IndexHandler {
   private final SegmentDirectory.Writer _segmentWriter;
   private final Set<String> _columnsToAddIdx;
   private final int _rangeIndexVersion;
+  private final RangeIndexCreatorProvider _indexCreatorProvider;
 
   public RangeIndexHandler(File indexDir, SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig,
-      SegmentDirectory.Writer segmentWriter) {
+      SegmentDirectory.Writer segmentWriter, RangeIndexCreatorProvider indexCreatorProvider) {
     _indexDir = indexDir;
     _segmentMetadata = segmentMetadata;
     _segmentWriter = segmentWriter;
     _columnsToAddIdx = new HashSet<>(indexLoadingConfig.getRangeIndexColumns());
     _rangeIndexVersion = indexLoadingConfig.getRangeIndexVersion();
+    _indexCreatorProvider = indexCreatorProvider;
   }
 
   @Override
@@ -125,7 +126,7 @@ public class RangeIndexHandler implements IndexHandler {
     int numDocs = columnMetadata.getTotalDocs();
     try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
         ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
-        CombinedInvertedIndexCreator rangeIndexCreator = newRangeIndexCreator(columnMetadata, FieldSpec.DataType.INT)) {
+        CombinedInvertedIndexCreator rangeIndexCreator = newRangeIndexCreator(columnMetadata)) {
       if (columnMetadata.isSingleValue()) {
         // Single-value column
         for (int i = 0; i < numDocs; i++) {
@@ -148,8 +149,7 @@ public class RangeIndexHandler implements IndexHandler {
     int numDocs = columnMetadata.getTotalDocs();
     try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
         ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
-        CombinedInvertedIndexCreator rangeIndexCreator = newRangeIndexCreator(columnMetadata,
-            columnMetadata.getDataType())) {
+        CombinedInvertedIndexCreator rangeIndexCreator = newRangeIndexCreator(columnMetadata)) {
       if (columnMetadata.isSingleValue()) {
         // Single-value column.
         switch (columnMetadata.getDataType()) {
@@ -216,13 +216,10 @@ public class RangeIndexHandler implements IndexHandler {
     }
   }
 
-  private CombinedInvertedIndexCreator newRangeIndexCreator(ColumnMetadata columnMetadata, FieldSpec.DataType dataType)
+  private CombinedInvertedIndexCreator newRangeIndexCreator(ColumnMetadata columnMetadata)
       throws IOException {
-    if (_rangeIndexVersion == BitSlicedRangeIndexCreator.VERSION && columnMetadata.isSingleValue()) {
-      return new BitSlicedRangeIndexCreator(_indexDir, columnMetadata);
-    }
-    // default to RangeIndexCreator for the time being
-    return new RangeIndexCreator(_indexDir, columnMetadata.getFieldSpec(), dataType, -1, -1,
-        columnMetadata.getTotalDocs(), columnMetadata.getTotalNumberOfEntries());
+    return _indexCreatorProvider.newRangeIndexCreator(
+        IndexCreationContext.builder().withIndexDir(_indexDir).withColumnMetadata(columnMetadata).build()
+            .forRangeIndex(_rangeIndexVersion, columnMetadata.getMinValue(), columnMetadata.getMaxValue()));
   }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java
index 19e3a91..6e7c64e 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java
@@ -40,13 +40,14 @@ import java.io.File;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
-import org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator;
 import org.apache.pinot.segment.local.segment.index.loader.IndexHandler;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
 import org.apache.pinot.segment.local.segment.index.loader.SegmentPreProcessor;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.TextIndexCreatorProvider;
 import org.apache.pinot.segment.spi.index.creator.TextIndexCreator;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
@@ -85,13 +86,15 @@ public class TextIndexHandler implements IndexHandler {
   private final SegmentMetadata _segmentMetadata;
   private final SegmentDirectory.Writer _segmentWriter;
   private final Set<String> _columnsToAddIdx;
+  private final TextIndexCreatorProvider _textIndexCreatorProvider;
 
   public TextIndexHandler(File indexDir, SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig,
-      SegmentDirectory.Writer segmentWriter) {
+      SegmentDirectory.Writer segmentWriter, TextIndexCreatorProvider textIndexCreatorProvider) {
     _indexDir = indexDir;
     _segmentMetadata = segmentMetadata;
     _segmentWriter = segmentWriter;
     _columnsToAddIdx = new HashSet<>(indexLoadingConfig.getTextIndexColumns());
+    _textIndexCreatorProvider = textIndexCreatorProvider;
   }
 
   @Override
@@ -144,7 +147,8 @@ public class TextIndexHandler implements IndexHandler {
     // based on segmentVersion.
     try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
         ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
-        LuceneTextIndexCreator textIndexCreator = new LuceneTextIndexCreator(column, segmentDirectory, true)) {
+        TextIndexCreator textIndexCreator = _textIndexCreatorProvider.newTextIndexCreator(IndexCreationContext.builder()
+            .withColumnMetadata(columnMetadata).withIndexDir(segmentDirectory).build().forTextIndex(true))) {
       if (columnMetadata.isSingleValue()) {
         processSVField(hasDictionary, forwardIndexReader, readerContext, textIndexCreator, numDocs, columnMetadata);
       } else {
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/IndexCreatorOverrideTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/IndexCreatorOverrideTest.java
new file mode 100644
index 0000000..6e32884
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/IndexCreatorOverrideTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.UUID;
+import org.apache.pinot.segment.local.segment.creator.impl.inv.OffHeapBitmapInvertedIndexCreator;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProviders;
+import org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator;
+import org.apache.pinot.segment.spi.index.metadata.ColumnMetadataImpl;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import static org.apache.commons.io.FileUtils.deleteQuietly;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class IndexCreatorOverrideTest {
+
+  private File _file;
+
+  @BeforeTest
+  public void before()
+      throws IOException {
+    _file = Files.createTempFile("IndexCreatorOverrideTest", UUID.randomUUID().toString()).toFile();
+  }
+
+  @AfterTest
+  public void cleanup() {
+    deleteQuietly(_file);
+  }
+
+  @Test
+  public void testOverrideInvertedIndexCreation()
+      throws IOException {
+    DictionaryBasedInvertedIndexCreator highCardinalityInvertedIndex = mock(DictionaryBasedInvertedIndexCreator.class);
+    IndexCreatorProvider provider = new IndexCreatorProviders.Default() {
+      @Override
+      public DictionaryBasedInvertedIndexCreator newInvertedIndexCreator(IndexCreationContext.Inverted context)
+          throws IOException {
+        if (context.getCardinality() >= 10000) {
+          return highCardinalityInvertedIndex;
+        }
+        return super.newInvertedIndexCreator(context);
+      }
+    };
+    mockStatic(IndexCreatorProviders.class).when(IndexCreatorProviders::getIndexCreatorProvider).thenReturn(provider);
+    IndexCreationContext.Inverted highCardinalityContext = newContext(Integer.MAX_VALUE);
+    assertEquals(IndexCreatorProviders.getIndexCreatorProvider().newInvertedIndexCreator(highCardinalityContext),
+        highCardinalityInvertedIndex);
+    IndexCreationContext.Inverted lowCardinalityContext = newContext(1);
+    assertTrue(IndexCreatorProviders.getIndexCreatorProvider()
+        .newInvertedIndexCreator(lowCardinalityContext) instanceof OffHeapBitmapInvertedIndexCreator);
+  }
+
+  private IndexCreationContext.Inverted newContext(int cardinality) {
+    FieldSpec fieldSpec = new DimensionFieldSpec("test", FieldSpec.DataType.INT, true);
+    return IndexCreationContext.builder().withIndexDir(_file)
+        .withColumnMetadata(ColumnMetadataImpl.builder().setFieldSpec(fieldSpec).setCardinality(cardinality).build())
+        .build().forInvertedIndex();
+  }
+}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/BitSlicedIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/BitSlicedIndexCreatorTest.java
index c06ebee..0ad213c 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/BitSlicedIndexCreatorTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/BitSlicedIndexCreatorTest.java
@@ -65,14 +65,12 @@ public class BitSlicedIndexCreatorTest {
 
   @Test(expectedExceptions = IllegalArgumentException.class)
   public void testFailToCreateRawString() {
-    new BitSlicedRangeIndexCreator(INDEX_DIR, new ColumnMetadataImpl.Builder()
-        .setFieldSpec(new DimensionFieldSpec("foo", STRING, true)).build());
+    new BitSlicedRangeIndexCreator(INDEX_DIR, new DimensionFieldSpec("foo", STRING, true), null, null);
   }
 
   @Test(expectedExceptions = IllegalArgumentException.class)
   public void testFailToCreateMV() {
-    new BitSlicedRangeIndexCreator(INDEX_DIR, new ColumnMetadataImpl.Builder()
-        .setFieldSpec(new DimensionFieldSpec("foo", INT, false)).build());
+    new BitSlicedRangeIndexCreator(INDEX_DIR, new DimensionFieldSpec("foo", INT, false), 0, 10);
   }
 
   @Test
@@ -153,7 +151,7 @@ public class BitSlicedIndexCreatorTest {
   private void testInt(Dataset<int[]> dataset)
       throws IOException {
     ColumnMetadata metadata = dataset.toColumnMetadata();
-    try (BitSlicedRangeIndexCreator creator = new BitSlicedRangeIndexCreator(INDEX_DIR, metadata)) {
+    try (BitSlicedRangeIndexCreator creator = newBitslicedIndexCreator(metadata)) {
       for (int value : dataset.values()) {
         creator.add(value);
       }
@@ -181,7 +179,7 @@ public class BitSlicedIndexCreatorTest {
   private void testLong(Dataset<long[]> dataset)
       throws IOException {
     ColumnMetadata metadata = dataset.toColumnMetadata();
-    try (BitSlicedRangeIndexCreator creator = new BitSlicedRangeIndexCreator(INDEX_DIR, metadata)) {
+    try (BitSlicedRangeIndexCreator creator = newBitslicedIndexCreator(metadata)) {
       for (long value : dataset.values()) {
         creator.add(value);
       }
@@ -209,7 +207,7 @@ public class BitSlicedIndexCreatorTest {
   private void testFloat(Dataset<float[]> dataset)
       throws IOException {
     ColumnMetadata metadata = dataset.toColumnMetadata();
-    try (BitSlicedRangeIndexCreator creator = new BitSlicedRangeIndexCreator(INDEX_DIR, metadata)) {
+    try (BitSlicedRangeIndexCreator creator = newBitslicedIndexCreator(metadata)) {
       for (float value : dataset.values()) {
         creator.add(value);
       }
@@ -237,7 +235,7 @@ public class BitSlicedIndexCreatorTest {
   private void testDouble(Dataset<double[]> dataset)
       throws IOException {
     ColumnMetadata metadata = dataset.toColumnMetadata();
-    try (BitSlicedRangeIndexCreator creator = new BitSlicedRangeIndexCreator(INDEX_DIR, metadata)) {
+    try (BitSlicedRangeIndexCreator creator = newBitslicedIndexCreator(metadata)) {
       for (double value : dataset.values()) {
         creator.add(value);
       }
@@ -262,6 +260,12 @@ public class BitSlicedIndexCreatorTest {
     }
   }
 
+  private static BitSlicedRangeIndexCreator newBitslicedIndexCreator(ColumnMetadata metadata) {
+    return metadata.hasDictionary() ? new BitSlicedRangeIndexCreator(INDEX_DIR,
+        metadata.getFieldSpec(), metadata.getCardinality()) : new BitSlicedRangeIndexCreator(INDEX_DIR,
+        metadata.getFieldSpec(), metadata.getMinValue(), metadata.getMaxValue());
+  }
+
   enum Distribution {
     NORMAL {
       @Override
diff --git a/pinot-segment-spi/pom.xml b/pinot-segment-spi/pom.xml
index 1af8184..89a56d9 100644
--- a/pinot-segment-spi/pom.xml
+++ b/pinot-segment-spi/pom.xml
@@ -93,5 +93,11 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 </project>
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/BloomFilterCreatorProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/BloomFilterCreatorProvider.java
new file mode 100644
index 0000000..ba23359
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/BloomFilterCreatorProvider.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+import java.io.IOException;
+import org.apache.pinot.segment.spi.index.creator.BloomFilterCreator;
+
+
+public interface BloomFilterCreatorProvider {
+  /**
+   * Creates a {@see BloomFilterCreator} from information about index creation.
+   * This allows a plugin to pattern match index creation information to select
+   * an appropriate implementation.
+   * @param context context about the index creation.
+   * @return a {@see ForwardIndexCreator}
+   * @throws IOException whenever something goes wrong matching or constructing the creator
+   */
+  BloomFilterCreator newBloomFilterCreator(IndexCreationContext.BloomFilter context)
+      throws IOException;
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ForwardIndexCreatorProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ForwardIndexCreatorProvider.java
new file mode 100644
index 0000000..8c15dbb
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ForwardIndexCreatorProvider.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+
+
+public interface ForwardIndexCreatorProvider {
+  /**
+   * Creates a {@see ForwardIndexCreator} from information about index creation.
+   * This allows a plugin to pattern match index creation information to select
+   * an appropriate implementation.
+   * @param context context about the index creation.
+   * @return a {@see ForwardIndexCreator}
+   * @throws Exception whenever something goes wrong matching or constructing the creator
+   */
+  ForwardIndexCreator newForwardIndexCreator(IndexCreationContext.Forward context)
+      throws Exception;
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/GeoSpatialIndexCreatorProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/GeoSpatialIndexCreatorProvider.java
new file mode 100644
index 0000000..34341a1
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/GeoSpatialIndexCreatorProvider.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+import java.io.IOException;
+import org.apache.pinot.segment.spi.index.creator.GeoSpatialIndexCreator;
+
+
+public interface GeoSpatialIndexCreatorProvider {
+
+  /**
+   * Creates a {@see GeoSpatialIndexCreator} from information about index creation.
+   * This allows a plugin to pattern match index creation information to select
+   * an appropriate implementation.
+   * @param context context about the index creation.
+   * @return a {@see ForwardIndexCreator}
+   * @throws IOException whenever something goes wrong matching or constructing the creator
+   */
+  GeoSpatialIndexCreator newGeoSpatialIndexCreator(IndexCreationContext.Geospatial context)
+      throws IOException;
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
new file mode 100644
index 0000000..c1a20ba
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
@@ -0,0 +1,467 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+import java.io.File;
+import java.util.Map;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.creator.H3IndexConfig;
+import org.apache.pinot.spi.config.table.BloomFilterConfig;
+import org.apache.pinot.spi.config.table.FSTType;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * Provides parameters for constructing indexes via {@see IndexCreatorProvider}.
+ * The responsibility for ensuring that the correct parameters for a particular
+ * index type lies with the caller.
+ */
+public interface IndexCreationContext {
+
+  FieldSpec getFieldSpec();
+
+  File getIndexDir();
+
+  boolean isOnHeap();
+
+  int getLengthOfLongestEntry();
+
+  int getMaxNumberOfMultiValueElements();
+
+  int getMaxRowLengthInBytes();
+
+  boolean isSorted();
+
+  int getCardinality();
+
+  int getTotalNumberOfEntries();
+
+  int getTotalDocs();
+
+  boolean hasDictionary();
+
+  final class Builder {
+    private File _indexDir;
+    private int _lengthOfLongestEntry;
+    private int _maxNumberOfMultiValueElements;
+    private int _maxRowLengthInBytes;
+    private boolean _onHeap = false;
+    private FieldSpec _fieldSpec;
+    private boolean _sorted;
+    private int _cardinality;
+    private int _totalNumberOfEntries;
+    private int _totalDocs;
+    private boolean _hasDictionary = true;
+
+    public Builder withColumnIndexCreationInfo(ColumnIndexCreationInfo columnIndexCreationInfo) {
+      return withLengthOfLongestEntry(columnIndexCreationInfo.getLengthOfLongestEntry())
+          .withMaxNumberOfMultiValueElements(columnIndexCreationInfo.getMaxNumberOfMultiValueElements())
+          .withMaxRowLengthInBytes(columnIndexCreationInfo.getMaxRowLengthInBytes());
+    }
+
+    public Builder withIndexDir(File indexDir) {
+      _indexDir = indexDir;
+      return this;
+    }
+
+    public Builder onHeap(boolean onHeap) {
+      _onHeap = onHeap;
+      return this;
+    }
+
+    public Builder withColumnMetadata(ColumnMetadata columnMetadata) {
+      return withFieldSpec(columnMetadata.getFieldSpec())
+          .sorted(columnMetadata.isSorted())
+          .withCardinality(columnMetadata.getCardinality())
+          .withTotalNumberOfEntries(columnMetadata.getTotalNumberOfEntries())
+          .withTotalDocs(columnMetadata.getTotalDocs())
+          .withDictionary(columnMetadata.hasDictionary());
+    }
+
+    public Builder withLengthOfLongestEntry(int lengthOfLongestEntry) {
+      _lengthOfLongestEntry = lengthOfLongestEntry;
+      return this;
+    }
+
+    public Builder withMaxNumberOfMultiValueElements(int maxNumberOfMultiValueElements) {
+      _maxNumberOfMultiValueElements = maxNumberOfMultiValueElements;
+      return this;
+    }
+
+    public Builder withMaxRowLengthInBytes(int maxRowLengthInBytes) {
+      _maxRowLengthInBytes = maxRowLengthInBytes;
+      return this;
+    }
+
+    public Builder withFieldSpec(FieldSpec fieldSpec) {
+      _fieldSpec = fieldSpec;
+      return this;
+    }
+
+    public Builder sorted(boolean sorted) {
+      _sorted = sorted;
+      return this;
+    }
+
+    public Builder withCardinality(int cardinality) {
+      _cardinality = cardinality;
+      return this;
+    }
+
+    public Builder withTotalNumberOfEntries(int totalNumberOfEntries) {
+      _totalNumberOfEntries = totalNumberOfEntries;
+      return this;
+    }
+
+    public Builder withTotalDocs(int totalDocs) {
+      _totalDocs = totalDocs;
+      return this;
+    }
+
+    public Builder withDictionary(boolean hasDictionary) {
+      _hasDictionary = hasDictionary;
+      return this;
+    }
+
+    public Common build() {
+      return new Common(Objects.requireNonNull(_indexDir),
+          _lengthOfLongestEntry, _maxNumberOfMultiValueElements, _maxRowLengthInBytes,
+          _onHeap, Objects.requireNonNull(_fieldSpec),
+          _sorted, _cardinality, _totalNumberOfEntries, _totalDocs, _hasDictionary);
+    }
+  }
+
+  static Builder builder() {
+    return new Builder();
+  }
+
+  final class Common implements IndexCreationContext {
+
+    private final File _indexDir;
+    private final int _lengthOfLongestEntry;
+    private final int _maxNumberOfMultiValueElements;
+    private final int _maxRowLengthInBytes;
+    private final boolean _onHeap;
+    private final FieldSpec _fieldSpec;
+    private final boolean _sorted;
+    private final int _cardinality;
+    private final int _totalNumberOfEntries;
+    private final int _totalDocs;
+    private final boolean _hasDictionary;
+
+    public Common(File indexDir, int lengthOfLongestEntry,
+        int maxNumberOfMultiValueElements, int maxRowLengthInBytes, boolean onHeap,
+        FieldSpec fieldSpec, boolean sorted, int cardinality, int totalNumberOfEntries,
+        int totalDocs, boolean hasDictionary) {
+      _indexDir = indexDir;
+      _lengthOfLongestEntry = lengthOfLongestEntry;
+      _maxNumberOfMultiValueElements = maxNumberOfMultiValueElements;
+      _maxRowLengthInBytes = maxRowLengthInBytes;
+      _onHeap = onHeap;
+      _fieldSpec = fieldSpec;
+      _sorted = sorted;
+      _cardinality = cardinality;
+      _totalNumberOfEntries = totalNumberOfEntries;
+      _totalDocs = totalDocs;
+      _hasDictionary = hasDictionary;
+    }
+
+    public FieldSpec getFieldSpec() {
+      return _fieldSpec;
+    }
+
+    public File getIndexDir() {
+      return _indexDir;
+    }
+
+    public boolean isOnHeap() {
+      return _onHeap;
+    }
+
+    public int getLengthOfLongestEntry() {
+      return _lengthOfLongestEntry;
+    }
+
+    public int getMaxNumberOfMultiValueElements() {
+      return _maxNumberOfMultiValueElements;
+    }
+
+    public int getMaxRowLengthInBytes() {
+      return _maxRowLengthInBytes;
+    }
+
+    public boolean isSorted() {
+      return _sorted;
+    }
+
+    public int getCardinality() {
+      return _cardinality;
+    }
+
+    public int getTotalNumberOfEntries() {
+      return _totalNumberOfEntries;
+    }
+
+    public int getTotalDocs() {
+      return _totalDocs;
+    }
+
+    public boolean hasDictionary() {
+      return _hasDictionary;
+    }
+
+    public BloomFilter forBloomFilter(BloomFilterConfig bloomFilterConfig) {
+      return new BloomFilter(this, bloomFilterConfig);
+    }
+
+    public Forward forForwardIndex(ChunkCompressionType chunkCompressionType,
+        @Nullable Map<String, Map<String, String>> columnProperties) {
+      return new Forward(this, chunkCompressionType, columnProperties);
+    }
+
+    public Text forFSTIndex(FSTType fstType, String[] sortedUniqueElementsArray) {
+      return new Text(this, fstType, sortedUniqueElementsArray);
+    }
+
+    public Geospatial forGeospatialIndex(H3IndexConfig h3IndexConfig) {
+      return new Geospatial(this, h3IndexConfig);
+    }
+
+    public Inverted forInvertedIndex() {
+      return new Inverted(this);
+    }
+
+    public Json forJsonIndex() {
+      return new Json(this);
+    }
+
+    public Range forRangeIndex(int rangeIndexVersion, Comparable<?> min, Comparable<?> max) {
+      return new Range(this, rangeIndexVersion, min, max);
+    }
+
+    public Text forTextIndex(boolean commitOnClose) {
+      return new Text(this, commitOnClose);
+    }
+  }
+
+  class Wrapper implements IndexCreationContext {
+
+    private final IndexCreationContext _delegate;
+
+    Wrapper(IndexCreationContext delegate) {
+      _delegate = delegate;
+    }
+
+    @Override
+    public FieldSpec getFieldSpec() {
+      return _delegate.getFieldSpec();
+    }
+
+    @Override
+    public File getIndexDir() {
+      return _delegate.getIndexDir();
+    }
+
+    @Override
+    public boolean isOnHeap() {
+      return _delegate.isOnHeap();
+    }
+
+    @Override
+    public int getLengthOfLongestEntry() {
+      return _delegate.getLengthOfLongestEntry();
+    }
+
+    @Override
+    public int getMaxNumberOfMultiValueElements() {
+      return _delegate.getMaxNumberOfMultiValueElements();
+    }
+
+    @Override
+    public int getMaxRowLengthInBytes() {
+      return _delegate.getMaxRowLengthInBytes();
+    }
+
+    @Override
+    public boolean isSorted() {
+      return _delegate.isSorted();
+    }
+
+    @Override
+    public int getCardinality() {
+      return _delegate.getCardinality();
+    }
+
+    @Override
+    public int getTotalNumberOfEntries() {
+      return _delegate.getTotalNumberOfEntries();
+    }
+
+    @Override
+    public int getTotalDocs() {
+      return _delegate.getTotalDocs();
+    }
+
+    @Override
+    public boolean hasDictionary() {
+      return _delegate.hasDictionary();
+    }
+  }
+
+  class BloomFilter extends Wrapper {
+
+    private final BloomFilterConfig _bloomFilterConfig;
+
+    public BloomFilter(IndexCreationContext wrapped, BloomFilterConfig bloomFilterConfig) {
+      super(wrapped);
+      _bloomFilterConfig = bloomFilterConfig;
+    }
+
+    @Nullable
+    public BloomFilterConfig getBloomFilterConfig() {
+      return _bloomFilterConfig;
+    }
+  }
+
+  class Forward extends Wrapper {
+
+    private final ChunkCompressionType _chunkCompressionType;
+    private final Map<String, Map<String, String>> _columnProperties;
+
+    Forward(IndexCreationContext delegate,
+        ChunkCompressionType chunkCompressionType,
+        @Nullable Map<String, Map<String, String>> columnProperties) {
+      super(delegate);
+      _chunkCompressionType = chunkCompressionType;
+      _columnProperties = columnProperties;
+    }
+
+    public ChunkCompressionType getChunkCompressionType() {
+      return _chunkCompressionType;
+    }
+
+    @Nullable
+    public Map<String, Map<String, String>> getColumnProperties() {
+      return _columnProperties;
+    }
+  }
+
+  class Geospatial extends Wrapper {
+
+    private final H3IndexConfig _h3IndexConfig;
+
+    Geospatial(IndexCreationContext delegate, H3IndexConfig h3IndexConfig) {
+      super(delegate);
+      _h3IndexConfig = h3IndexConfig;
+    }
+
+    public H3IndexConfig getH3IndexConfig() {
+      return _h3IndexConfig;
+    }
+  }
+
+  class Inverted extends Wrapper {
+
+    Inverted(IndexCreationContext delegate) {
+      super(delegate);
+    }
+  }
+
+  class Json extends Wrapper {
+
+    Json(IndexCreationContext delegate) {
+      super(delegate);
+    }
+  }
+
+  class Range extends Wrapper {
+
+    private final Comparable<?> _min;
+    private final Comparable<?> _max;
+    private final int _rangeIndexVersion;
+
+
+    Range(IndexCreationContext delegate, int rangeIndexVersion, Comparable<?> min, Comparable<?> max) {
+      super(delegate);
+      _rangeIndexVersion = rangeIndexVersion;
+      _min = min;
+      _max = max;
+    }
+
+    public Comparable<?> getMin() {
+      return _min;
+    }
+
+    public Comparable<?> getMax() {
+      return _max;
+    }
+
+    public int getRangeIndexVersion() {
+      return _rangeIndexVersion;
+    }
+  }
+
+  class Text extends Wrapper {
+    private final boolean _commitOnClose;
+    private final boolean _isFst;
+    private final FSTType _fstType;
+    private final String[] _sortedUniqueElementsArray;
+
+    /**
+     * For text indexes
+     */
+    public Text(IndexCreationContext wrapped, boolean commitOnClose) {
+      super(wrapped);
+      _commitOnClose = commitOnClose;
+      _fstType = null;
+      _sortedUniqueElementsArray = null;
+      _isFst = false;
+    }
+
+    /**
+     * For FST indexes
+     */
+    public Text(IndexCreationContext wrapped, FSTType fstType, String[] sortedUniqueElementsArray) {
+      super(wrapped);
+      _commitOnClose = true;
+      _fstType = fstType;
+      _sortedUniqueElementsArray = sortedUniqueElementsArray;
+      _isFst = true;
+    }
+
+    public boolean isCommitOnClose() {
+      return _commitOnClose;
+    }
+
+    public FSTType getFstType() {
+      return _fstType;
+    }
+
+    public boolean isFst() {
+      return _isFst;
+    }
+
+    public String[] getSortedUniqueElementsArray() {
+      return _sortedUniqueElementsArray;
+    }
+  }
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreatorProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreatorProvider.java
new file mode 100644
index 0000000..4dd1a02
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreatorProvider.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+/**
+ * Plugin interface to abstract index creation.
+ */
+public interface IndexCreatorProvider
+    extends ForwardIndexCreatorProvider, InvertedIndexCreatorProvider, JsonIndexCreatorProvider,
+            TextIndexCreatorProvider, GeoSpatialIndexCreatorProvider, RangeIndexCreatorProvider,
+            BloomFilterCreatorProvider {
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreatorProviders.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreatorProviders.java
new file mode 100644
index 0000000..40466f1
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreatorProviders.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodType;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pinot.segment.spi.index.creator.BloomFilterCreator;
+import org.apache.pinot.segment.spi.index.creator.CombinedInvertedIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.GeoSpatialIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.TextIndexCreator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Plugin registration point to allow index creation logic to be swapped out.
+ * Plugins should not reimplement Pinot's default index creation logic.
+ * Users provide an override to Pinot's index creation logic. This is simplified
+ * by extending {@see IndexCreatorProviders.Default}
+ */
+public final class IndexCreatorProviders {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(IndexCreatorProviders.class);
+
+  private static final IndexCreatorProvider DEFAULT = defaultProvider();
+  private static final AtomicReference<IndexCreatorProvider> REGISTRATION = new AtomicReference<>(DEFAULT);
+
+  private IndexCreatorProviders() {
+  }
+
+  /**
+   * The caller provides a decorator to wrap the default provider, which allows plugins to create
+   * a delegation chain.
+   * @param provider index creation override
+   * @return true if this is the first invocation and the provider has not yet been used.
+   */
+  public static boolean registerProvider(IndexCreatorProvider provider) {
+    return REGISTRATION.compareAndSet(DEFAULT, provider);
+  }
+
+  /**
+   * Obtain the registered index creator provider. If the user has provided an override, then it will be used instead.
+   * If the user has not provided an override yet, then this action will prevent them from doing so.
+   * @return the global index provision logic.
+   */
+  public static IndexCreatorProvider getIndexCreatorProvider() {
+    return Holder.PROVIDER;
+  }
+
+  private static final class Holder {
+    public static final IndexCreatorProvider PROVIDER = REGISTRATION.get();
+  }
+
+  private static IndexCreatorProvider defaultProvider() {
+    // use MethodHandle to break circular dependency and keep implementation details encapsulated within
+    // pinot-segment-local
+    String className = "org.apache.pinot.segment.local.segment.creator.impl.DefaultIndexCreatorProvider";
+    try {
+      Class<?> clazz = Class.forName(className, false, IndexCreatorProviders.class.getClassLoader());
+      return (IndexCreatorProvider) MethodHandles.publicLookup()
+          .findConstructor(clazz, MethodType.methodType(void.class)).invoke();
+    } catch (Throwable missing) {
+      LOGGER.error("could not construct MethodHandle for {}", className, missing);
+      // this means pinot-segment-local isn't on the classpath, but this means
+      // no indexes will be created, so it's ok to return null
+      return null;
+    }
+  }
+
+  /**
+   * Extend this class to override index creation
+   */
+  public static class Default implements IndexCreatorProvider {
+
+    @Override
+    public BloomFilterCreator newBloomFilterCreator(IndexCreationContext.BloomFilter context)
+        throws IOException {
+      if (DEFAULT == null) {
+        throw new UnsupportedOperationException("default implementation not present on classpath");
+      }
+      return DEFAULT.newBloomFilterCreator(context);
+    }
+
+    @Override
+    public ForwardIndexCreator newForwardIndexCreator(IndexCreationContext.Forward context)
+        throws Exception {
+      if (DEFAULT == null) {
+        throw new UnsupportedOperationException("default implementation not present on classpath");
+      }
+      return DEFAULT.newForwardIndexCreator(context);
+    }
+
+    @Override
+    public GeoSpatialIndexCreator newGeoSpatialIndexCreator(IndexCreationContext.Geospatial context)
+        throws IOException {
+      if (DEFAULT == null) {
+        throw new UnsupportedOperationException("default implementation not present on classpath");
+      }
+      return DEFAULT.newGeoSpatialIndexCreator(context);
+    }
+
+    @Override
+    public DictionaryBasedInvertedIndexCreator newInvertedIndexCreator(IndexCreationContext.Inverted context)
+        throws IOException {
+      if (DEFAULT == null) {
+        throw new UnsupportedOperationException("default implementation not present on classpath");
+      }
+      return DEFAULT.newInvertedIndexCreator(context);
+    }
+
+    @Override
+    public JsonIndexCreator newJsonIndexCreator(IndexCreationContext.Json context)
+        throws IOException {
+      if (DEFAULT == null) {
+        throw new UnsupportedOperationException("default implementation not present on classpath");
+      }
+      return DEFAULT.newJsonIndexCreator(context);
+    }
+
+    @Override
+    public CombinedInvertedIndexCreator newRangeIndexCreator(IndexCreationContext.Range context)
+        throws IOException {
+      if (DEFAULT == null) {
+        throw new UnsupportedOperationException("default implementation not present on classpath");
+      }
+      return DEFAULT.newRangeIndexCreator(context);
+    }
+
+    @Override
+    public TextIndexCreator newTextIndexCreator(IndexCreationContext.Text context)
+        throws IOException {
+      if (DEFAULT == null) {
+        throw new UnsupportedOperationException("default implementation not present on classpath");
+      }
+      return DEFAULT.newTextIndexCreator(context);
+    }
+  }
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/InvertedIndexCreatorProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/InvertedIndexCreatorProvider.java
new file mode 100644
index 0000000..2bf7f1a
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/InvertedIndexCreatorProvider.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+import java.io.IOException;
+import org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator;
+
+
+public interface InvertedIndexCreatorProvider {
+  /**
+   * Creates a {@see DictionaryBasedInvertedIndexCreator} from information about index creation.
+   * This allows a plugin to pattern match index creation information to select
+   * an appropriate implementation.
+   * @param context context about the index creation.
+   * @return a {@see ForwardIndexCreator}
+   * @throws IOException whenever something goes wrong matching or constructing the creator
+   */
+  DictionaryBasedInvertedIndexCreator newInvertedIndexCreator(IndexCreationContext.Inverted context)
+      throws IOException;
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/JsonIndexCreatorProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/JsonIndexCreatorProvider.java
new file mode 100644
index 0000000..22e7f54
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/JsonIndexCreatorProvider.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+import java.io.IOException;
+import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
+
+
+public interface JsonIndexCreatorProvider {
+  /**
+   * Creates a {@see JsonIndexCreator} from information about index creation.
+   * This allows a plugin to pattern match index creation information to select
+   * an appropriate implementation.
+   * @param context context about the index creation.
+   * @return a {@see ForwardIndexCreator}
+   * @throws IOException whenever something goes wrong matching or constructing the creator
+   */
+  JsonIndexCreator newJsonIndexCreator(IndexCreationContext.Json context)
+      throws IOException;
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/RangeIndexCreatorProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/RangeIndexCreatorProvider.java
new file mode 100644
index 0000000..a3abfc6
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/RangeIndexCreatorProvider.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+import java.io.IOException;
+import org.apache.pinot.segment.spi.index.creator.CombinedInvertedIndexCreator;
+
+
+public interface RangeIndexCreatorProvider {
+  /**
+   * Creates a {@see CombinedInvertedIndexCreator} from information about index creation.
+   * This allows a plugin to pattern match index creation information to select
+   * an appropriate implementation.
+   * @param context context about the index creation.
+   * @return a {@see ForwardIndexCreator}
+   * @throws IOException whenever something goes wrong matching or constructing the creator
+   */
+  CombinedInvertedIndexCreator newRangeIndexCreator(IndexCreationContext.Range context)
+      throws IOException;
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/TextIndexCreatorProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/TextIndexCreatorProvider.java
new file mode 100644
index 0000000..05f53ea
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/TextIndexCreatorProvider.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+import java.io.IOException;
+import org.apache.pinot.segment.spi.index.creator.TextIndexCreator;
+
+
+public interface TextIndexCreatorProvider {
+
+  /**
+   * Creates a {@see TextIndexCreator} from information about index creation.
+   * This allows a plugin to pattern match index creation information to select
+   * an appropriate implementation.
+   * @param context context about the index creation.
+   * @return a {@see ForwardIndexCreator}
+   * @throws IOException whenever something goes wrong matching or constructing the creator
+   */
+  TextIndexCreator newTextIndexCreator(IndexCreationContext.Text context)
+      throws IOException;
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java
index 19dbb2a..519f7c1 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java
@@ -289,6 +289,10 @@ public class ColumnMetadataImpl implements ColumnMetadata {
     return builder.build();
   }
 
+  public static Builder builder() {
+    return new Builder();
+  }
+
   public static class Builder {
     private FieldSpec _fieldSpec;
     private int _totalDocs;
diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/IndexCreatorProvidersTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/IndexCreatorProvidersTest.java
new file mode 100644
index 0000000..bfaebb3
--- /dev/null
+++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/IndexCreatorProvidersTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+import java.io.IOException;
+import org.apache.pinot.segment.spi.index.creator.BloomFilterCreator;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class IndexCreatorProvidersTest {
+
+  @Test
+  public void indexCreatorProvidersLoadableWithoutDefaultImplementation()
+      throws IOException {
+    BloomFilterCreator mockBloomFilterCreator = mock(BloomFilterCreator.class);
+    assertTrue(IndexCreatorProviders.registerProvider(new IndexCreatorProviders.Default() {
+      @Override
+      public BloomFilterCreator newBloomFilterCreator(IndexCreationContext.BloomFilter context) {
+        return mockBloomFilterCreator;
+      }
+    }));
+    // it's ok to load external overrides without an internal implementation present, e.g. for testing
+    assertEquals(mockBloomFilterCreator, IndexCreatorProviders.getIndexCreatorProvider()
+        .newBloomFilterCreator(mock(IndexCreationContext.BloomFilter.class)));
+  }
+
+  @Test(expectedExceptions = UnsupportedOperationException.class)
+  public void whenDefaultImplementationMissingThrowUnsupportedOperationException()
+      throws IOException {
+    // the implementation is missing so no indexes will be created anyway...
+    new IndexCreatorProviders.Default().newBloomFilterCreator(mock(IndexCreationContext.BloomFilter.class));
+  }
+}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
index d349da8..0367203 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
@@ -31,7 +31,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
-import org.apache.pinot.segment.local.segment.creator.impl.SegmentColumnarIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.DefaultIndexCreatorProvider;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
@@ -314,7 +314,7 @@ public class DictionaryToRawIndexConverter {
     int numDocs = segment.getSegmentMetadata().getTotalDocs();
     int lengthOfLongestEntry = (storedType == DataType.STRING) ? getLengthOfLongestEntry(dictionary) : -1;
 
-    try (ForwardIndexCreator rawIndexCreator = SegmentColumnarIndexCreator
+    try (ForwardIndexCreator rawIndexCreator = DefaultIndexCreatorProvider
         .getRawIndexCreatorForSVColumn(newSegment, compressionType, column, storedType, numDocs, lengthOfLongestEntry,
             false, BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
         ForwardIndexReaderContext readerContext = reader.createContext()) {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org