You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/12/16 20:22:16 UTC
[pinot] branch master updated: make index creator provision pluggable (#7885)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f9ab252 make index creator provision pluggable (#7885)
f9ab252 is described below
commit f9ab252980e4f973d60b9db2a0f5e7d5764bdaf2
Author: Richard Startin <ri...@startree.ai>
AuthorDate: Thu Dec 16 20:21:48 2021 +0000
make index creator provision pluggable (#7885)
This allows index creation to be intercepted, so that the current static logic in SegmentIndexCreator can be extended or overridden. This is achieved by introducing a new interface IndexCreatorProvider which provides various new index creators from an IndexCreationContext which bundles all information about index creation. External users can register a decorator which can enhance or entirely replace the default index creator provision logic. Typically, a registered decorator should pa [...]
---
.../pinot/core/minion/RawIndexConverter.java | 19 +-
.../org/apache/pinot/perf/BenchmarkRangeIndex.java | 11 +-
.../ConvertToRawIndexTaskExecutor.java | 4 +-
pinot-segment-local/pom.xml | 3 +-
.../creator/impl/DefaultIndexCreatorProvider.java | 274 ++++++++++++
.../creator/impl/SegmentColumnarIndexCreator.java | 290 +++----------
.../impl/inv/BitSlicedRangeIndexCreator.java | 55 ++-
.../segment/index/loader/IndexHandlerFactory.java | 20 +-
.../loader/bloomfilter/BloomFilterHandler.java | 12 +-
.../loader/invertedindex/FSTIndexHandler.java | 17 +-
.../index/loader/invertedindex/H3IndexHandler.java | 12 +-
.../loader/invertedindex/InvertedIndexHandler.java | 14 +-
.../loader/invertedindex/JsonIndexHandler.java | 16 +-
.../loader/invertedindex/RangeIndexHandler.java | 25 +-
.../loader/invertedindex/TextIndexHandler.java | 10 +-
.../creator/impl/IndexCreatorOverrideTest.java | 88 ++++
.../index/creator/BitSlicedIndexCreatorTest.java | 20 +-
pinot-segment-spi/pom.xml | 6 +
.../spi/creator/BloomFilterCreatorProvider.java | 36 ++
.../spi/creator/ForwardIndexCreatorProvider.java | 35 ++
.../creator/GeoSpatialIndexCreatorProvider.java | 37 ++
.../segment/spi/creator/IndexCreationContext.java | 467 +++++++++++++++++++++
.../segment/spi/creator/IndexCreatorProvider.java | 28 ++
.../segment/spi/creator/IndexCreatorProviders.java | 159 +++++++
.../spi/creator/InvertedIndexCreatorProvider.java | 36 ++
.../spi/creator/JsonIndexCreatorProvider.java | 36 ++
.../spi/creator/RangeIndexCreatorProvider.java | 36 ++
.../spi/creator/TextIndexCreatorProvider.java | 37 ++
.../spi/index/metadata/ColumnMetadataImpl.java | 4 +
.../spi/creator/IndexCreatorProvidersTest.java | 53 +++
.../converter/DictionaryToRawIndexConverter.java | 4 +-
31 files changed, 1538 insertions(+), 326 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
index 78552dd..f6d2a7d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
@@ -26,8 +26,6 @@ import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
-import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
-import org.apache.pinot.segment.local.segment.creator.impl.SegmentColumnarIndexCreator;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.utils.CrcUtils;
@@ -36,12 +34,15 @@ import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.creator.ForwardIndexCreatorProvider;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.MetricFieldSpec;
@@ -82,13 +83,14 @@ public class RawIndexConverter {
private final File _convertedIndexDir;
private final PropertiesConfiguration _convertedProperties;
private final String _columnsToConvert;
+ private final ForwardIndexCreatorProvider _indexCreatorProvider;
/**
* NOTE: original segment should be in V1 format.
* TODO: support V3 format
*/
public RawIndexConverter(String rawTableName, File originalIndexDir, File convertedIndexDir,
- @Nullable String columnsToConvert)
+ @Nullable String columnsToConvert, ForwardIndexCreatorProvider indexCreatorProvider)
throws Exception {
FileUtils.copyDirectory(originalIndexDir, convertedIndexDir);
IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
@@ -101,6 +103,7 @@ public class RawIndexConverter {
_convertedProperties =
new PropertiesConfiguration(new File(_convertedIndexDir, V1Constants.MetadataKeys.METADATA_FILE_NAME));
_columnsToConvert = columnsToConvert;
+ _indexCreatorProvider = indexCreatorProvider;
}
public boolean convert()
@@ -205,11 +208,11 @@ public class RawIndexConverter {
assert dictionary != null;
DataType storedType = dictionary.getValueType();
int numDocs = _originalSegmentMetadata.getTotalDocs();
- int lengthOfLongestEntry = _originalSegmentMetadata.getColumnMetadataFor(columnName).getColumnMaxLength();
- try (ForwardIndexCreator rawIndexCreator = SegmentColumnarIndexCreator
- .getRawIndexCreatorForSVColumn(_convertedIndexDir, ChunkCompressionType.LZ4, columnName,
- storedType, numDocs, lengthOfLongestEntry, false,
- BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+ ColumnMetadata columnMetadata = _originalSegmentMetadata.getColumnMetadataFor(columnName);
+ try (ForwardIndexCreator rawIndexCreator = _indexCreatorProvider.newForwardIndexCreator(
+ IndexCreationContext.builder().withIndexDir(_convertedIndexDir).withColumnMetadata(columnMetadata)
+ .withFieldSpec(new DimensionFieldSpec(columnName, storedType, columnMetadata.isSingleValue()))
+ .withDictionary(false).build().forForwardIndex(ChunkCompressionType.LZ4, null));
ForwardIndexReaderContext readerContext = reader.createContext()) {
switch (storedType) {
case INT:
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRangeIndex.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRangeIndex.java
index 60db2a3..e250eb6 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRangeIndex.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRangeIndex.java
@@ -191,14 +191,7 @@ public class BenchmarkRangeIndex {
public void setup()
throws IOException {
super.setup();
- ColumnMetadata metadata = new ColumnMetadataImpl.Builder()
- .setFieldSpec(_fieldSpec)
- .setTotalDocs(_numDocs)
- .setHasDictionary(false)
- .setMaxValue(max())
- .setMinValue(min())
- .build();
- _creator = new BitSlicedRangeIndexCreator(_indexDir, metadata);
+ _creator = new BitSlicedRangeIndexCreator(_indexDir, _fieldSpec, min(), max());
}
}
@@ -328,7 +321,7 @@ public class BenchmarkRangeIndex {
@Override
protected RawValueBasedInvertedIndexCreator newCreator() {
- return new BitSlicedRangeIndexCreator(_indexDir, metadata());
+ return new BitSlicedRangeIndexCreator(_indexDir, _fieldSpec, min(), max());
}
}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutor.java
index 7a5a33f..ba3e610 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutor.java
@@ -27,6 +27,7 @@ import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.core.minion.RawIndexConverter;
import org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor;
import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProviders;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -39,7 +40,8 @@ public class ConvertToRawIndexTaskExecutor extends BaseSingleSegmentConversionEx
String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
new RawIndexConverter(rawTableName, indexDir, workingDir,
- configs.get(MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY)).convert();
+ configs.get(MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY),
+ IndexCreatorProviders.getIndexCreatorProvider()).convert();
return new SegmentConversionResult.Builder().setFile(workingDir)
.setTableNameWithType(configs.get(MinionConstants.TABLE_NAME_KEY))
.setSegmentName(configs.get(MinionConstants.SEGMENT_NAME_KEY)).build();
diff --git a/pinot-segment-local/pom.xml b/pinot-segment-local/pom.xml
index c60da9c..e1bab72 100644
--- a/pinot-segment-local/pom.xml
+++ b/pinot-segment-local/pom.xml
@@ -122,9 +122,10 @@
<artifactId>testng</artifactId>
<scope>test</scope>
</dependency>
+ <!-- required for static mock in IndexCreatorOverrideTest -->
<dependency>
<groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
+ <artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
<dependency>
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java
new file mode 100644
index 0000000..2753459
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.local.segment.creator.impl.bloom.OnHeapGuavaBloomFilterCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueFixedByteRawIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSortedForwardIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.inv.OffHeapBitmapInvertedIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.inv.OnHeapBitmapInvertedIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.inv.RangeIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.inv.geospatial.OffHeapH3IndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.inv.geospatial.OnHeapH3IndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.inv.json.OffHeapJsonIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.inv.json.OnHeapJsonIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.inv.text.LuceneFSTIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator;
+import org.apache.pinot.segment.local.utils.nativefst.NativeFSTIndexCreator;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
+import org.apache.pinot.segment.spi.index.creator.BloomFilterCreator;
+import org.apache.pinot.segment.spi.index.creator.CombinedInvertedIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.GeoSpatialIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.TextIndexCreator;
+import org.apache.pinot.segment.spi.index.reader.H3IndexResolution;
+import org.apache.pinot.spi.config.table.FSTType;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * This class centralizes logic for how to create indexes. It can be overridden
+ * by SPI {@see IndexCreatorProviders} and should not be constructed directly, but
+ * accessed only via {@see IndexCreatorProviders#getIndexCreatorProvider}. Unless
+ * a user provides an override, this is the logic which will be used to create
+ * each index type.
+ */
+public final class DefaultIndexCreatorProvider implements IndexCreatorProvider {
+
+ @Override
+ public ForwardIndexCreator newForwardIndexCreator(IndexCreationContext.Forward context)
+ throws Exception {
+ if (!context.hasDictionary()) {
+ boolean deriveNumDocsPerChunk =
+ shouldDeriveNumDocsPerChunk(context.getFieldSpec().getName(), context.getColumnProperties());
+ int writerVersion = getRawIndexWriterVersion(context.getFieldSpec().getName(), context.getColumnProperties());
+ if (context.getFieldSpec().isSingleValueField()) {
+ return getRawIndexCreatorForSVColumn(context.getIndexDir(), context.getChunkCompressionType(),
+ context.getFieldSpec().getName(), context.getFieldSpec().getDataType().getStoredType(),
+ context.getTotalDocs(), context.getLengthOfLongestEntry(), deriveNumDocsPerChunk, writerVersion);
+ } else {
+ return getRawIndexCreatorForMVColumn(context.getIndexDir(), context.getChunkCompressionType(),
+ context.getFieldSpec().getName(), context.getFieldSpec().getDataType().getStoredType(),
+ context.getTotalDocs(), context.getMaxNumberOfMultiValueElements(), deriveNumDocsPerChunk, writerVersion,
+ context.getMaxRowLengthInBytes());
+ }
+ } else {
+ if (context.getFieldSpec().isSingleValueField()) {
+ if (context.isSorted()) {
+ return new SingleValueSortedForwardIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
+ context.getCardinality());
+ } else {
+ return new SingleValueUnsortedForwardIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
+ context.getCardinality(), context.getTotalDocs());
+ }
+ } else {
+ return new MultiValueUnsortedForwardIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
+ context.getCardinality(), context.getTotalDocs(), context.getTotalNumberOfEntries());
+ }
+ }
+ }
+
+ @Override
+ public DictionaryBasedInvertedIndexCreator newInvertedIndexCreator(IndexCreationContext.Inverted context)
+ throws IOException {
+ if (context.isOnHeap()) {
+ return new OnHeapBitmapInvertedIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
+ context.getCardinality());
+ } else {
+ return new OffHeapBitmapInvertedIndexCreator(context.getIndexDir(), context.getFieldSpec(),
+ context.getCardinality(), context.getTotalDocs(), context.getTotalNumberOfEntries());
+ }
+ }
+
+ @Override
+ public JsonIndexCreator newJsonIndexCreator(IndexCreationContext.Json context)
+ throws IOException {
+ Preconditions.checkState(context.getFieldSpec().isSingleValueField(),
+ "Json index is currently only supported on single-value columns");
+ Preconditions.checkState(context.getFieldSpec().getDataType().getStoredType() == FieldSpec.DataType.STRING,
+ "Json index is currently only supported on STRING columns");
+ return context.isOnHeap() ? new OnHeapJsonIndexCreator(context.getIndexDir(), context.getFieldSpec().getName())
+ : new OffHeapJsonIndexCreator(context.getIndexDir(), context.getFieldSpec().getName());
+ }
+
+ @Override
+ public TextIndexCreator newTextIndexCreator(IndexCreationContext.Text context)
+ throws IOException {
+ if (context.isFst()) {
+ Preconditions.checkState(context.getFieldSpec().isSingleValueField(),
+ "FST index is currently only supported on single-value columns");
+ Preconditions.checkState(context.getFieldSpec().getDataType().getStoredType() == FieldSpec.DataType.STRING,
+ "FST index is currently only supported on STRING type columns");
+ Preconditions.checkState(context.hasDictionary(),
+ "FST index is currently only supported on dictionary-encoded columns");
+ String[] sortedValues = context.getSortedUniqueElementsArray();
+ if (context.getFstType() == FSTType.NATIVE) {
+ return new NativeFSTIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(), sortedValues);
+ } else {
+ return new LuceneFSTIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(), sortedValues);
+ }
+ } else {
+ Preconditions.checkState(context.getFieldSpec().getDataType().getStoredType() == FieldSpec.DataType.STRING,
+ "Text index is currently only supported on STRING type columns");
+ return new LuceneTextIndexCreator(context.getFieldSpec().getName(), context.getIndexDir(),
+ context.isCommitOnClose());
+ }
+ }
+
+ @Override
+ public GeoSpatialIndexCreator newGeoSpatialIndexCreator(IndexCreationContext.Geospatial context)
+ throws IOException {
+ Preconditions.checkState(context.getFieldSpec().isSingleValueField(),
+ "H3 index is currently only supported on single-value columns");
+ Preconditions.checkState(context.getFieldSpec().getDataType().getStoredType() == FieldSpec.DataType.BYTES,
+ "H3 index is currently only supported on BYTES columns");
+ H3IndexResolution resolution = Objects.requireNonNull(context.getH3IndexConfig()).getResolution();
+ return context.isOnHeap() ? new OnHeapH3IndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
+ resolution) : new OffHeapH3IndexCreator(context.getIndexDir(), context.getFieldSpec().getName(), resolution);
+ }
+
+ public static boolean shouldDeriveNumDocsPerChunk(String columnName,
+ Map<String, Map<String, String>> columnProperties) {
+ if (columnProperties != null) {
+ Map<String, String> properties = columnProperties.get(columnName);
+ return properties != null && Boolean.parseBoolean(
+ properties.get(FieldConfig.DERIVE_NUM_DOCS_PER_CHUNK_RAW_INDEX_KEY));
+ }
+ return false;
+ }
+
+ public static int getRawIndexWriterVersion(String columnName, Map<String, Map<String, String>> columnProperties) {
+ if (columnProperties != null && columnProperties.get(columnName) != null) {
+ Map<String, String> properties = columnProperties.get(columnName);
+ String version = properties.get(FieldConfig.RAW_INDEX_WRITER_VERSION);
+ if (version == null) {
+ return BaseChunkSVForwardIndexWriter.DEFAULT_VERSION;
+ }
+ return Integer.parseInt(version);
+ }
+ return BaseChunkSVForwardIndexWriter.DEFAULT_VERSION;
+ }
+
+ /**
+ * Helper method to build the raw index creator for the column.
+ * Assumes that column to be indexed is single valued.
+ *
+ * @param file Output index file
+ * @param column Column name
+ * @param totalDocs Total number of documents to index
+ * @param lengthOfLongestEntry Length of longest entry
+ * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive the number of rows per chunk
+ * @param writerVersion version to use for the raw index writer
+ * @return raw index creator
+ */
+ public static ForwardIndexCreator getRawIndexCreatorForSVColumn(File file, ChunkCompressionType compressionType,
+ String column, FieldSpec.DataType dataType, int totalDocs, int lengthOfLongestEntry,
+ boolean deriveNumDocsPerChunk,
+ int writerVersion)
+ throws IOException {
+ switch (dataType.getStoredType()) {
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ return new SingleValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs, dataType,
+ writerVersion);
+ case STRING:
+ case BYTES:
+ return new SingleValueVarByteRawIndexCreator(file, compressionType, column, totalDocs, dataType,
+ lengthOfLongestEntry, deriveNumDocsPerChunk, writerVersion);
+ default:
+ throw new UnsupportedOperationException("Data type not supported for raw indexing: " + dataType);
+ }
+ }
+
+ /**
+ * Helper method to build the raw index creator for the column.
+ * Assumes that column to be indexed is single valued.
+ *
+ * @param file Output index file
+ * @param column Column name
+ * @param totalDocs Total number of documents to index
+ * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive the number of rows
+ * per chunk
+ * @param writerVersion version to use for the raw index writer
+ * @param maxRowLengthInBytes the length of the longest row in bytes
+ * @return raw index creator
+ */
+ public static ForwardIndexCreator getRawIndexCreatorForMVColumn(File file, ChunkCompressionType compressionType,
+ String column, FieldSpec.DataType dataType, final int totalDocs, int maxNumberOfMultiValueElements,
+ boolean deriveNumDocsPerChunk, int writerVersion, int maxRowLengthInBytes)
+ throws IOException {
+ switch (dataType.getStoredType()) {
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ return new MultiValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs, dataType,
+ maxNumberOfMultiValueElements, deriveNumDocsPerChunk, writerVersion);
+ case STRING:
+ case BYTES:
+ return new MultiValueVarByteRawIndexCreator(file, compressionType, column, totalDocs, dataType, writerVersion,
+ maxRowLengthInBytes, maxNumberOfMultiValueElements);
+ default:
+ throw new UnsupportedOperationException("Data type not supported for raw indexing: " + dataType);
+ }
+ }
+
+ @Override
+ public BloomFilterCreator newBloomFilterCreator(IndexCreationContext.BloomFilter context)
+ throws IOException {
+ return new OnHeapGuavaBloomFilterCreator(context.getIndexDir(), context.getFieldSpec().getName(),
+ context.getCardinality(), Objects.requireNonNull(context.getBloomFilterConfig()));
+ }
+
+ @Override
+ public CombinedInvertedIndexCreator newRangeIndexCreator(IndexCreationContext.Range context)
+ throws IOException {
+ if (context.getRangeIndexVersion() == BitSlicedRangeIndexCreator.VERSION && context.getFieldSpec()
+ .isSingleValueField()) {
+ if (context.hasDictionary()) {
+ return new BitSlicedRangeIndexCreator(context.getIndexDir(), context.getFieldSpec(), context.getCardinality());
+ }
+ return new BitSlicedRangeIndexCreator(context.getIndexDir(), context.getFieldSpec(), context.getMin(),
+ context.getMax());
+ }
+ // default to RangeIndexCreator for the time being
+ return new RangeIndexCreator(context.getIndexDir(), context.getFieldSpec(),
+ context.hasDictionary() ? FieldSpec.DataType.INT : context.getFieldSpec().getDataType(), -1,
+ -1, context.getTotalDocs(), context.getTotalNumberOfEntries());
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
index 764d36f..b01dd3f 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -33,28 +33,14 @@ import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.pinot.common.utils.FileUtils;
import org.apache.pinot.segment.local.io.util.PinotDataBitSet;
-import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
-import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueFixedByteRawIndexCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSortedForwardIndexCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.inv.OffHeapBitmapInvertedIndexCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.inv.OnHeapBitmapInvertedIndexCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.inv.geospatial.OffHeapH3IndexCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.inv.geospatial.OnHeapH3IndexCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.inv.json.OffHeapJsonIndexCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.inv.json.OnHeapJsonIndexCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.inv.text.LuceneFSTIndexCreator;
import org.apache.pinot.segment.local.segment.creator.impl.nullvalue.NullValueVectorCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator;
import org.apache.pinot.segment.local.utils.GeometrySerializer;
-import org.apache.pinot.segment.local.utils.nativefst.NativeFSTIndexCreator;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.creator.ColumnIndexCreationInfo;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProviders;
import org.apache.pinot.segment.spi.creator.SegmentCreator;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator;
@@ -64,9 +50,7 @@ import org.apache.pinot.segment.spi.index.creator.H3IndexConfig;
import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
import org.apache.pinot.segment.spi.index.creator.SegmentIndexCreationInfo;
import org.apache.pinot.segment.spi.index.creator.TextIndexCreator;
-import org.apache.pinot.segment.spi.index.reader.H3IndexResolution;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
-import org.apache.pinot.spi.config.table.FSTType;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
@@ -99,6 +83,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
private SegmentGeneratorConfig _config;
private Map<String, ColumnIndexCreationInfo> _indexCreationInfoMap;
+ private final IndexCreatorProvider _indexCreatorProvider = IndexCreatorProviders.getIndexCreatorProvider();
private final Map<String, SegmentDictionaryCreator> _dictionaryCreatorMap = new HashMap<>();
private final Map<String, ForwardIndexCreator> _forwardIndexCreatorMap = new HashMap<>();
private final Map<String, DictionaryBasedInvertedIndexCreator> _invertedIndexCreatorMap = new HashMap<>();
@@ -179,126 +164,70 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
}
String columnName = fieldSpec.getName();
- DataType storedType = fieldSpec.getDataType().getStoredType();
- ColumnIndexCreationInfo indexCreationInfo = indexCreationInfoMap.get(columnName);
- Preconditions.checkNotNull(indexCreationInfo, "Missing index creation info for column: %s", columnName);
- boolean dictEnabledColumn = createDictionaryForColumn(indexCreationInfo, segmentCreationSpec, fieldSpec);
-
+ ColumnIndexCreationInfo columnIndexCreationInfo = indexCreationInfoMap.get(columnName);
+ Preconditions.checkNotNull(columnIndexCreationInfo, "Missing index creation info for column: %s", columnName);
+ boolean dictEnabledColumn = createDictionaryForColumn(columnIndexCreationInfo, segmentCreationSpec, fieldSpec);
+ Preconditions.checkState(dictEnabledColumn || !invertedIndexColumns.contains(columnName),
+ "Cannot create inverted index for raw index column: %s", columnName);
+
+ IndexCreationContext.Common context = IndexCreationContext.builder()
+ .withIndexDir(_indexDir)
+ .withCardinality(columnIndexCreationInfo.getDistinctValueCount())
+ .withDictionary(dictEnabledColumn)
+ .withFieldSpec(fieldSpec)
+ .withTotalDocs(segmentIndexCreationInfo.getTotalDocs())
+ .withTotalNumberOfEntries(columnIndexCreationInfo.getTotalNumberOfEntries())
+ .withColumnIndexCreationInfo(columnIndexCreationInfo)
+ .sorted(columnIndexCreationInfo.isSorted())
+ .onHeap(segmentCreationSpec.isOnHeap())
+ .build();
+ // Initialize forward index creator
+ ChunkCompressionType chunkCompressionType =
+ dictEnabledColumn ? null : getColumnCompressionType(segmentCreationSpec, fieldSpec);
+ _forwardIndexCreatorMap.put(columnName, _indexCreatorProvider.newForwardIndexCreator(
+ context.forForwardIndex(chunkCompressionType, segmentCreationSpec.getColumnProperties())));
+
+ // Initialize inverted index creator; skip creating inverted index if sorted
+ if (invertedIndexColumns.contains(columnName) && !columnIndexCreationInfo.isSorted()) {
+ _invertedIndexCreatorMap.put(columnName,
+ _indexCreatorProvider.newInvertedIndexCreator(context.forInvertedIndex()));
+ }
if (dictEnabledColumn) {
// Create dictionary-encoded index
-
// Initialize dictionary creator
SegmentDictionaryCreator dictionaryCreator =
- new SegmentDictionaryCreator(indexCreationInfo.getSortedUniqueElementsArray(), fieldSpec, _indexDir,
- indexCreationInfo.isUseVarLengthDictionary());
+ new SegmentDictionaryCreator(columnIndexCreationInfo.getSortedUniqueElementsArray(), fieldSpec, _indexDir,
+ columnIndexCreationInfo.isUseVarLengthDictionary());
_dictionaryCreatorMap.put(columnName, dictionaryCreator);
-
// Create dictionary
try {
dictionaryCreator.build();
} catch (Exception e) {
LOGGER.error("Error building dictionary for field: {}, cardinality: {}, number of bytes per entry: {}",
- fieldSpec.getName(), indexCreationInfo.getDistinctValueCount(), dictionaryCreator.getNumBytesPerEntry());
+ fieldSpec.getName(), columnIndexCreationInfo.getDistinctValueCount(),
+ dictionaryCreator.getNumBytesPerEntry());
throw e;
}
-
- // Initialize forward index creator
- int cardinality = indexCreationInfo.getDistinctValueCount();
- if (fieldSpec.isSingleValueField()) {
- if (indexCreationInfo.isSorted()) {
- _forwardIndexCreatorMap.put(columnName,
- new SingleValueSortedForwardIndexCreator(_indexDir, columnName, cardinality));
- } else {
- _forwardIndexCreatorMap.put(columnName,
- new SingleValueUnsortedForwardIndexCreator(_indexDir, columnName, cardinality, _totalDocs));
- }
- } else {
- _forwardIndexCreatorMap.put(columnName,
- new MultiValueUnsortedForwardIndexCreator(_indexDir, columnName, cardinality, _totalDocs,
- indexCreationInfo.getTotalNumberOfEntries()));
- }
-
- // Initialize inverted index creator; skip creating inverted index if sorted
- if (invertedIndexColumns.contains(columnName) && !indexCreationInfo.isSorted()) {
- if (segmentCreationSpec.isOnHeap()) {
- _invertedIndexCreatorMap.put(columnName,
- new OnHeapBitmapInvertedIndexCreator(_indexDir, columnName, cardinality));
- } else {
- _invertedIndexCreatorMap.put(columnName,
- new OffHeapBitmapInvertedIndexCreator(_indexDir, fieldSpec, cardinality, _totalDocs,
- indexCreationInfo.getTotalNumberOfEntries()));
- }
- }
- } else {
- // Create raw index
- Preconditions.checkState(!invertedIndexColumns.contains(columnName),
- "Cannot create inverted index for raw index column: %s", columnName);
-
- ChunkCompressionType compressionType = getColumnCompressionType(segmentCreationSpec, fieldSpec);
-
- // Initialize forward index creator
- boolean deriveNumDocsPerChunk =
- shouldDeriveNumDocsPerChunk(columnName, segmentCreationSpec.getColumnProperties());
- int writerVersion = rawIndexWriterVersion(columnName, segmentCreationSpec.getColumnProperties());
- if (fieldSpec.isSingleValueField()) {
- _forwardIndexCreatorMap.put(columnName,
- getRawIndexCreatorForSVColumn(_indexDir, compressionType, columnName, storedType, _totalDocs,
- indexCreationInfo.getLengthOfLongestEntry(), deriveNumDocsPerChunk, writerVersion));
- } else {
- _forwardIndexCreatorMap.put(columnName,
- getRawIndexCreatorForMVColumn(_indexDir, compressionType, columnName, storedType, _totalDocs,
- indexCreationInfo.getMaxNumberOfMultiValueElements(), deriveNumDocsPerChunk, writerVersion,
- indexCreationInfo.getMaxRowLengthInBytes()));
- }
}
if (textIndexColumns.contains(columnName)) {
- // Initialize text index creator
- Preconditions.checkState(storedType == DataType.STRING,
- "Text index is currently only supported on STRING type columns");
- _textIndexCreatorMap.put(columnName,
- new LuceneTextIndexCreator(columnName, _indexDir, true /* commitOnClose */));
+ _textIndexCreatorMap.put(columnName, _indexCreatorProvider.newTextIndexCreator(context.forTextIndex(true)));
}
if (fstIndexColumns.contains(columnName)) {
- Preconditions.checkState(fieldSpec.isSingleValueField(),
- "FST index is currently only supported on single-value columns");
- Preconditions.checkState(storedType == DataType.STRING,
- "FST index is currently only supported on STRING type columns");
- Preconditions.checkState(dictEnabledColumn,
- "FST index is currently only supported on dictionary-encoded columns");
- String[] sortedValues = (String[]) indexCreationInfo.getSortedUniqueElementsArray();
- TextIndexCreator textIndexCreator;
- if (_config.getFSTIndexType() == FSTType.NATIVE) {
- textIndexCreator = new NativeFSTIndexCreator(_indexDir, columnName, sortedValues);
- } else {
- textIndexCreator = new LuceneFSTIndexCreator(_indexDir, columnName, sortedValues);
- }
-
- _fstIndexCreatorMap.put(columnName, textIndexCreator);
+ _fstIndexCreatorMap.put(columnName, _indexCreatorProvider.newTextIndexCreator(
+ context.forFSTIndex(_config.getFSTIndexType(),
+ (String[]) columnIndexCreationInfo.getSortedUniqueElementsArray())));
}
if (jsonIndexColumns.contains(columnName)) {
- Preconditions.checkState(fieldSpec.isSingleValueField(),
- "Json index is currently only supported on single-value columns");
- Preconditions.checkState(storedType == DataType.STRING,
- "Json index is currently only supported on STRING columns");
- JsonIndexCreator jsonIndexCreator =
- segmentCreationSpec.isOnHeap() ? new OnHeapJsonIndexCreator(_indexDir, columnName)
- : new OffHeapJsonIndexCreator(_indexDir, columnName);
- _jsonIndexCreatorMap.put(columnName, jsonIndexCreator);
+ _jsonIndexCreatorMap.put(columnName, _indexCreatorProvider.newJsonIndexCreator(context.forJsonIndex()));
}
H3IndexConfig h3IndexConfig = h3IndexConfigs.get(columnName);
if (h3IndexConfig != null) {
- Preconditions.checkState(fieldSpec.isSingleValueField(),
- "H3 index is currently only supported on single-value columns");
- Preconditions.checkState(storedType == DataType.BYTES, "H3 index is currently only supported on BYTES columns");
- H3IndexResolution resolution = h3IndexConfig.getResolution();
- GeoSpatialIndexCreator h3IndexCreator =
- segmentCreationSpec.isOnHeap() ? new OnHeapH3IndexCreator(_indexDir, columnName, resolution)
- : new OffHeapH3IndexCreator(_indexDir, columnName, resolution);
- _h3IndexCreatorMap.put(columnName, h3IndexCreator);
+ _h3IndexCreatorMap.put(columnName,
+ _indexCreatorProvider.newGeoSpatialIndexCreator(context.forGeospatialIndex(h3IndexConfig)));
}
_nullHandlingEnabled = _config.isNullHandlingEnabled();
@@ -309,26 +238,29 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
}
}
- public static boolean shouldDeriveNumDocsPerChunk(String columnName,
- Map<String, Map<String, String>> columnProperties) {
- if (columnProperties != null) {
- Map<String, String> properties = columnProperties.get(columnName);
- return properties != null && Boolean.parseBoolean(
- properties.get(FieldConfig.DERIVE_NUM_DOCS_PER_CHUNK_RAW_INDEX_KEY));
- }
- return false;
- }
-
- public static int rawIndexWriterVersion(String columnName, Map<String, Map<String, String>> columnProperties) {
- if (columnProperties != null && columnProperties.get(columnName) != null) {
- Map<String, String> properties = columnProperties.get(columnName);
- String version = properties.get(FieldConfig.RAW_INDEX_WRITER_VERSION);
- if (version == null) {
- return BaseChunkSVForwardIndexWriter.DEFAULT_VERSION;
- }
- return Integer.parseInt(version);
+ /**
+ * Returns true if dictionary should be created for a column, false otherwise.
+ * Currently there are two sources for this config:
+ * <ul>
+ * <li> ColumnIndexCreationInfo (this is currently hard-coded to always return dictionary). </li>
+ * <li> SegmentGeneratorConfig</li>
+ * </ul>
+ *
+ * This method gives preference to the SegmentGeneratorConfig first.
+ *
+ * @param info Column index creation info
+ * @param config Segment generation config
+ * @param spec Field spec for the column
+ * @return True if dictionary should be created for the column, false otherwise
+ */
+ private boolean createDictionaryForColumn(ColumnIndexCreationInfo info, SegmentGeneratorConfig config,
+ FieldSpec spec) {
+ String column = spec.getName();
+ if (config.getRawIndexCreationColumns().contains(column) || config.getRawIndexCompressionType()
+ .containsKey(column)) {
+ return false;
}
- return BaseChunkSVForwardIndexWriter.DEFAULT_VERSION;
+ return info.isCreateDictionary();
}
/**
@@ -347,7 +279,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
FieldSpec fieldSpec) {
ChunkCompressionType compressionType = segmentCreationSpec.getRawIndexCompressionType().get(fieldSpec.getName());
if (compressionType == null) {
- if (fieldSpec.getFieldType() == FieldType.METRIC) {
+ if (fieldSpec.getFieldType() == FieldSpec.FieldType.METRIC) {
return ChunkCompressionType.PASS_THROUGH;
} else {
return ChunkCompressionType.LZ4;
@@ -357,31 +289,6 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
}
}
- /**
- * Returns true if dictionary should be created for a column, false otherwise.
- * Currently there are two sources for this config:
- * <ul>
- * <li> ColumnIndexCreationInfo (this is currently hard-coded to always return dictionary). </li>
- * <li> SegmentGeneratorConfig</li>
- * </ul>
- *
- * This method gives preference to the SegmentGeneratorConfig first.
- *
- * @param info Column index creation info
- * @param config Segment generation config
- * @param spec Field spec for the column
- * @return True if dictionary should be created for the column, false otherwise
- */
- private boolean createDictionaryForColumn(ColumnIndexCreationInfo info, SegmentGeneratorConfig config,
- FieldSpec spec) {
- String column = spec.getName();
- if (config.getRawIndexCreationColumns().contains(column) || config.getRawIndexCompressionType()
- .containsKey(column)) {
- return false;
- }
- return info.isCreateDictionary();
- }
-
@Override
public void indexRow(GenericRow row)
throws IOException {
@@ -795,71 +702,6 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
properties.subset(COLUMN_PROPS_KEY_PREFIX + column).clear();
}
- /**
- * Helper method to build the raw index creator for the column.
- * Assumes that column to be indexed is single valued.
- *
- * @param file Output index file
- * @param column Column name
- * @param totalDocs Total number of documents to index
- * @param lengthOfLongestEntry Length of longest entry
- * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive the number of rows per chunk
- * @param writerVersion version to use for the raw index writer
- * @return raw index creator
- */
- public static ForwardIndexCreator getRawIndexCreatorForSVColumn(File file, ChunkCompressionType compressionType,
- String column, DataType dataType, int totalDocs, int lengthOfLongestEntry, boolean deriveNumDocsPerChunk,
- int writerVersion)
- throws IOException {
- switch (dataType.getStoredType()) {
- case INT:
- case LONG:
- case FLOAT:
- case DOUBLE:
- return new SingleValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs, dataType,
- writerVersion);
- case STRING:
- case BYTES:
- return new SingleValueVarByteRawIndexCreator(file, compressionType, column, totalDocs, dataType,
- lengthOfLongestEntry, deriveNumDocsPerChunk, writerVersion);
- default:
- throw new UnsupportedOperationException("Data type not supported for raw indexing: " + dataType);
- }
- }
-
- /**
- * Helper method to build the raw index creator for the column.
- * Assumes that column to be indexed is single valued.
- *
- * @param file Output index file
- * @param column Column name
- * @param totalDocs Total number of documents to index
- * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive the number of rows
- * per chunk
- * @param writerVersion version to use for the raw index writer
- * @param maxRowLengthInBytes the length of the longest row in bytes
- * @return raw index creator
- */
- public static ForwardIndexCreator getRawIndexCreatorForMVColumn(File file, ChunkCompressionType compressionType,
- String column, DataType dataType, final int totalDocs, int maxNumberOfMultiValueElements,
- boolean deriveNumDocsPerChunk, int writerVersion, int maxRowLengthInBytes)
- throws IOException {
- switch (dataType.getStoredType()) {
- case INT:
- case LONG:
- case FLOAT:
- case DOUBLE:
- return new MultiValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs, dataType,
- maxNumberOfMultiValueElements, deriveNumDocsPerChunk, writerVersion);
- case STRING:
- case BYTES:
- return new MultiValueVarByteRawIndexCreator(file, compressionType, column, totalDocs, dataType, writerVersion,
- maxRowLengthInBytes, maxNumberOfMultiValueElements);
- default:
- throw new UnsupportedOperationException("Data type not supported for raw indexing: " + dataType);
- }
- }
-
@Override
public void close()
throws IOException {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/BitSlicedRangeIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/BitSlicedRangeIndexCreator.java
index 4c0b98b..0f96bf4 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/BitSlicedRangeIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/BitSlicedRangeIndexCreator.java
@@ -18,10 +18,10 @@
*/
package org.apache.pinot.segment.local.segment.creator.impl.inv;
+import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import org.apache.pinot.segment.local.utils.FPOrdering;
-import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.index.creator.CombinedInvertedIndexCreator;
import org.apache.pinot.spi.data.FieldSpec;
import org.roaringbitmap.RangeBitmap;
@@ -41,13 +41,33 @@ public class BitSlicedRangeIndexCreator implements CombinedInvertedIndexCreator
private final File _rangeIndexFile;
private final long _minValue;
- public BitSlicedRangeIndexCreator(File indexDir, ColumnMetadata metadata) {
- if (!metadata.isSingleValue()) {
- throw new IllegalArgumentException("MV columns not supported");
- }
- _appender = RangeBitmap.appender(maxValue(metadata));
- _rangeIndexFile = new File(indexDir, metadata.getColumnName() + BITMAP_RANGE_INDEX_FILE_EXTENSION);
- _minValue = minValue(metadata);
+ private BitSlicedRangeIndexCreator(File indexDir, FieldSpec fieldSpec, long minValue, long maxValue) {
+ Preconditions.checkArgument(fieldSpec.isSingleValueField(), "MV columns not supported");
+ _rangeIndexFile = new File(indexDir, fieldSpec.getName() + BITMAP_RANGE_INDEX_FILE_EXTENSION);
+ _appender = RangeBitmap.appender(maxValue);
+ _minValue = minValue;
+ }
+
+ /**
+ * For dictionarized columns
+ * @param indexDir the directory for the index
+ * @param fieldSpec the specification of the field
+ * @param cardinality the cardinality of the dictionary
+ */
+ public BitSlicedRangeIndexCreator(File indexDir, FieldSpec fieldSpec, int cardinality) {
+ this(indexDir, fieldSpec, 0, cardinality - 1);
+ }
+
+ /**
+ * For raw columns
+ * @param indexDir the directory for the index
+ * @param fieldSpec the specification of the field
+ * @param minValue the minimum value
+ * @param maxValue the maximum value
+ */
+ public BitSlicedRangeIndexCreator(File indexDir, FieldSpec fieldSpec, Comparable<?> minValue,
+ Comparable<?> maxValue) {
+ this(indexDir, fieldSpec, minValue(fieldSpec, minValue), maxValue(fieldSpec, minValue, maxValue));
}
@Override
@@ -110,13 +130,8 @@ public class BitSlicedRangeIndexCreator implements CombinedInvertedIndexCreator
throws IOException {
}
- private static long maxValue(ColumnMetadata metadata) {
- if (metadata.hasDictionary()) {
- return metadata.getCardinality() - 1;
- }
- FieldSpec.DataType storedType = metadata.getDataType().getStoredType();
- Comparable<?> minValue = metadata.getMinValue();
- Comparable<?> maxValue = metadata.getMaxValue();
+ private static long maxValue(FieldSpec fieldSpec, Comparable<?> minValue, Comparable<?> maxValue) {
+ FieldSpec.DataType storedType = fieldSpec.getDataType().getStoredType();
if (storedType == INT || storedType == LONG) {
return ((Number) maxValue).longValue() - ((Number) minValue).longValue();
}
@@ -126,15 +141,11 @@ public class BitSlicedRangeIndexCreator implements CombinedInvertedIndexCreator
if (storedType == DOUBLE) {
return 0xFFFFFFFFFFFFFFFFL;
}
- throw new IllegalArgumentException("Unsupported data type: " + metadata.getDataType());
+ throw new IllegalArgumentException("Unsupported data type: " + fieldSpec.getDataType());
}
- private static long minValue(ColumnMetadata metadata) {
- if (metadata.hasDictionary()) {
- return 0;
- }
- FieldSpec.DataType storedType = metadata.getDataType().getStoredType();
- Comparable<?> minValue = metadata.getMinValue();
+ private static long minValue(FieldSpec fieldSpec, Comparable<?> minValue) {
+ FieldSpec.DataType storedType = fieldSpec.getDataType().getStoredType();
if (storedType == INT || storedType == LONG) {
return ((Number) minValue).longValue();
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java
index 9dd9ecb..a6fe925 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java
@@ -26,6 +26,8 @@ import org.apache.pinot.segment.local.segment.index.loader.invertedindex.Inverte
import org.apache.pinot.segment.local.segment.index.loader.invertedindex.JsonIndexHandler;
import org.apache.pinot.segment.local.segment.index.loader.invertedindex.RangeIndexHandler;
import org.apache.pinot.segment.local.segment.index.loader.invertedindex.TextIndexHandler;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProviders;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
@@ -40,22 +42,26 @@ public class IndexHandlerFactory {
public static IndexHandler getIndexHandler(ColumnIndexType type, File indexDir, SegmentMetadataImpl segmentMetadata,
IndexLoadingConfig indexLoadingConfig, SegmentDirectory.Writer segmentWriter) {
+ IndexCreatorProvider indexCreatorProvider = IndexCreatorProviders.getIndexCreatorProvider();
switch (type) {
case INVERTED_INDEX:
- return new InvertedIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter);
+ return new InvertedIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter,
+ indexCreatorProvider);
case RANGE_INDEX:
- return new RangeIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter);
+ return new RangeIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter,
+ indexCreatorProvider);
case TEXT_INDEX:
- return new TextIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter);
+ return new TextIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter, indexCreatorProvider);
case FST_INDEX:
return new FSTIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter,
- indexLoadingConfig.getFSTIndexType());
+ indexLoadingConfig.getFSTIndexType(), indexCreatorProvider);
case JSON_INDEX:
- return new JsonIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter);
+ return new JsonIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter, indexCreatorProvider);
case H3_INDEX:
- return new H3IndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter);
+ return new H3IndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter, indexCreatorProvider);
case BLOOM_FILTER:
- return new BloomFilterHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter);
+ return new BloomFilterHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter,
+ indexCreatorProvider);
default:
return NO_OP_HANDLER;
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java
index b8a3264..f992344 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java
@@ -24,7 +24,6 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.segment.local.segment.creator.impl.bloom.OnHeapGuavaBloomFilterCreator;
import org.apache.pinot.segment.local.segment.index.loader.IndexHandler;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
@@ -37,6 +36,8 @@ import org.apache.pinot.segment.local.segment.index.readers.LongDictionary;
import org.apache.pinot.segment.local.segment.index.readers.StringDictionary;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.BloomFilterCreatorProvider;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.creator.BloomFilterCreator;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
@@ -57,13 +58,15 @@ public class BloomFilterHandler implements IndexHandler {
private final SegmentMetadataImpl _segmentMetadata;
private final SegmentDirectory.Writer _segmentWriter;
private final Map<String, BloomFilterConfig> _bloomFilterConfigs;
+ private final BloomFilterCreatorProvider _indexCreatorProvider;
public BloomFilterHandler(File indexDir, SegmentMetadataImpl segmentMetadata, IndexLoadingConfig indexLoadingConfig,
- SegmentDirectory.Writer segmentWriter) {
+ SegmentDirectory.Writer segmentWriter, BloomFilterCreatorProvider indexCreatorProvider) {
_indexDir = indexDir;
_segmentWriter = segmentWriter;
_segmentMetadata = segmentMetadata;
_bloomFilterConfigs = indexLoadingConfig.getBloomFilterConfigs();
+ _indexCreatorProvider = indexCreatorProvider;
}
@Override
@@ -112,8 +115,9 @@ public class BloomFilterHandler implements IndexHandler {
BloomFilterConfig bloomFilterConfig = _bloomFilterConfigs.get(columnName);
LOGGER.info("Creating new bloom filter for segment: {}, column: {} with config: {}", segmentName, columnName,
bloomFilterConfig);
- try (BloomFilterCreator bloomFilterCreator = new OnHeapGuavaBloomFilterCreator(_indexDir, columnName,
- columnMetadata.getCardinality(), bloomFilterConfig);
+ try (BloomFilterCreator bloomFilterCreator = _indexCreatorProvider.newBloomFilterCreator(
+ IndexCreationContext.builder().withIndexDir(_indexDir).withColumnMetadata(columnMetadata)
+ .build().forBloomFilter(bloomFilterConfig));
Dictionary dictionary = getDictionaryReader(columnMetadata, _segmentWriter)) {
int length = dictionary.length();
for (int i = 0; i < length; i++) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/FSTIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/FSTIndexHandler.java
index 50f2b67..292bf0c 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/FSTIndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/FSTIndexHandler.java
@@ -24,15 +24,15 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.segment.local.segment.creator.impl.inv.text.LuceneFSTIndexCreator;
import org.apache.pinot.segment.local.segment.index.loader.IndexHandler;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
import org.apache.pinot.segment.local.segment.index.loader.SegmentPreProcessor;
-import org.apache.pinot.segment.local.utils.nativefst.NativeFSTIndexCreator;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
+import org.apache.pinot.segment.spi.creator.TextIndexCreatorProvider;
import org.apache.pinot.segment.spi.index.creator.TextIndexCreator;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
@@ -71,14 +71,16 @@ public class FSTIndexHandler implements IndexHandler {
private final SegmentDirectory.Writer _segmentWriter;
private final Set<String> _columnsToAddIdx;
private final FSTType _fstType;
+ private final TextIndexCreatorProvider _indexCreatorProvider;
public FSTIndexHandler(File indexDir, SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig,
- SegmentDirectory.Writer segmentWriter, FSTType fstType) {
+ SegmentDirectory.Writer segmentWriter, FSTType fstType, TextIndexCreatorProvider indexCreatorProvider) {
_indexDir = indexDir;
_segmentMetadata = segmentMetadata;
_segmentWriter = segmentWriter;
_columnsToAddIdx = new HashSet<>(indexLoadingConfig.getFSTIndexColumns());
_fstType = fstType;
+ _indexCreatorProvider = indexCreatorProvider;
}
@Override
@@ -136,12 +138,9 @@ public class FSTIndexHandler implements IndexHandler {
LOGGER.info("Creating new FST index for column: {} in segment: {}, cardinality: {}", column, segmentName,
columnMetadata.getCardinality());
- TextIndexCreator fstIndexCreator;
- if (_fstType == FSTType.LUCENE) {
- fstIndexCreator = new LuceneFSTIndexCreator(_indexDir, column, null);
- } else {
- fstIndexCreator = new NativeFSTIndexCreator(_indexDir, column, null);
- }
+ TextIndexCreator fstIndexCreator = _indexCreatorProvider.newTextIndexCreator(
+ IndexCreationContext.builder().withIndexDir(_indexDir).withColumnMetadata(columnMetadata)
+ .build().forFSTIndex(_fstType, null));
try (Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter, columnMetadata)) {
for (int dictId = 0; dictId < dictionary.length(); dictId++) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java
index ef96114..d2fceaa 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java
@@ -32,7 +32,10 @@ import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
import org.apache.pinot.segment.local.utils.GeometrySerializer;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
+import org.apache.pinot.segment.spi.index.creator.GeoSpatialIndexCreator;
import org.apache.pinot.segment.spi.index.creator.H3IndexConfig;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
@@ -53,13 +56,15 @@ public class H3IndexHandler implements IndexHandler {
private final SegmentMetadataImpl _segmentMetadata;
private final SegmentDirectory.Writer _segmentWriter;
private final Map<String, H3IndexConfig> _h3Configs;
+ private final IndexCreatorProvider _indexCreatorProvider;
public H3IndexHandler(File indexDir, SegmentMetadataImpl segmentMetadata, IndexLoadingConfig indexLoadingConfig,
- SegmentDirectory.Writer segmentWriter) {
+ SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider) {
_indexDir = indexDir;
_segmentMetadata = segmentMetadata;
_segmentWriter = segmentWriter;
_h3Configs = indexLoadingConfig.getH3IndexConfigs();
+ _indexCreatorProvider = indexCreatorProvider;
}
@Override
@@ -129,8 +134,9 @@ public class H3IndexHandler implements IndexHandler {
try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter, columnMetadata);
- OffHeapH3IndexCreator h3IndexCreator = new OffHeapH3IndexCreator(_indexDir, columnName,
- _h3Configs.get(columnName).getResolution())) {
+ GeoSpatialIndexCreator h3IndexCreator = _indexCreatorProvider.newGeoSpatialIndexCreator(
+ IndexCreationContext.builder().withIndexDir(_indexDir).withColumnMetadata(columnMetadata)
+ .build().forGeospatialIndex(_h3Configs.get(columnName)))) {
int numDocs = columnMetadata.getTotalDocs();
for (int i = 0; i < numDocs; i++) {
int dictId = forwardIndexReader.getDictId(i, readerContext);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java
index ee9d78f..693ca32 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java
@@ -23,14 +23,16 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.segment.local.segment.creator.impl.inv.OffHeapBitmapInvertedIndexCreator;
import org.apache.pinot.segment.local.segment.index.loader.IndexHandler;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.InvertedIndexCreatorProvider;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
+import org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
@@ -47,13 +49,15 @@ public class InvertedIndexHandler implements IndexHandler {
private final SegmentMetadata _segmentMetadata;
private final SegmentDirectory.Writer _segmentWriter;
private final HashSet<String> _columnsToAddIdx;
+ private final InvertedIndexCreatorProvider _indexCreatorProvider;
public InvertedIndexHandler(File indexDir, SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig,
- SegmentDirectory.Writer segmentWriter) {
+ SegmentDirectory.Writer segmentWriter, InvertedIndexCreatorProvider indexCreatorProvider) {
_indexDir = indexDir;
_segmentMetadata = segmentMetadata;
_segmentWriter = segmentWriter;
_columnsToAddIdx = new HashSet<>(indexLoadingConfig.getInvertedIndexColumns());
+ _indexCreatorProvider = indexCreatorProvider;
}
@Override
@@ -100,9 +104,9 @@ public class InvertedIndexHandler implements IndexHandler {
// Create new inverted index for the column.
LOGGER.info("Creating new inverted index for segment: {}, column: {}", segmentName, column);
int numDocs = columnMetadata.getTotalDocs();
- try (OffHeapBitmapInvertedIndexCreator creator = new OffHeapBitmapInvertedIndexCreator(_indexDir,
- columnMetadata.getFieldSpec(), columnMetadata.getCardinality(), numDocs,
- columnMetadata.getTotalNumberOfEntries())) {
+ try (DictionaryBasedInvertedIndexCreator creator = _indexCreatorProvider.newInvertedIndexCreator(
+ IndexCreationContext.builder().withIndexDir(_indexDir).withColumnMetadata(columnMetadata).build()
+ .forInvertedIndex())) {
try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
ForwardIndexReaderContext readerContext = forwardIndexReader.createContext()) {
if (columnMetadata.isSingleValue()) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java
index 93d52e8..27b8ade 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java
@@ -24,14 +24,16 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.segment.local.segment.creator.impl.inv.json.OffHeapJsonIndexCreator;
import org.apache.pinot.segment.local.segment.index.loader.IndexHandler;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.JsonIndexCreatorProvider;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
+import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
@@ -50,13 +52,15 @@ public class JsonIndexHandler implements IndexHandler {
private final SegmentMetadata _segmentMetadata;
private final SegmentDirectory.Writer _segmentWriter;
private final HashSet<String> _columnsToAddIdx;
+ private final JsonIndexCreatorProvider _indexCreatorProvider;
public JsonIndexHandler(File indexDir, SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig,
- SegmentDirectory.Writer segmentWriter) {
+ SegmentDirectory.Writer segmentWriter, JsonIndexCreatorProvider indexCreatorProvider) {
_indexDir = indexDir;
_segmentMetadata = segmentMetadata;
_segmentWriter = segmentWriter;
_columnsToAddIdx = new HashSet<>(indexLoadingConfig.getJsonIndexColumns());
+ _indexCreatorProvider = indexCreatorProvider;
}
@Override
@@ -122,11 +126,11 @@ public class JsonIndexHandler implements IndexHandler {
private void handleDictionaryBasedColumn(ColumnMetadata columnMetadata)
throws IOException {
- String columnName = columnMetadata.getColumnName();
try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter, columnMetadata);
- OffHeapJsonIndexCreator jsonIndexCreator = new OffHeapJsonIndexCreator(_indexDir, columnName)) {
+ JsonIndexCreator jsonIndexCreator = _indexCreatorProvider.newJsonIndexCreator(IndexCreationContext.builder()
+ .withIndexDir(_indexDir).withColumnMetadata(columnMetadata).build().forJsonIndex())) {
int numDocs = columnMetadata.getTotalDocs();
for (int i = 0; i < numDocs; i++) {
int dictId = forwardIndexReader.getDictId(i, readerContext);
@@ -138,10 +142,10 @@ public class JsonIndexHandler implements IndexHandler {
private void handleNonDictionaryBasedColumn(ColumnMetadata columnMetadata)
throws IOException {
- String columnName = columnMetadata.getColumnName();
try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
- OffHeapJsonIndexCreator jsonIndexCreator = new OffHeapJsonIndexCreator(_indexDir, columnName)) {
+ JsonIndexCreator jsonIndexCreator = _indexCreatorProvider.newJsonIndexCreator(IndexCreationContext.builder()
+ .withIndexDir(_indexDir).withColumnMetadata(columnMetadata).build().forJsonIndex())) {
int numDocs = columnMetadata.getTotalDocs();
for (int i = 0; i < numDocs; i++) {
jsonIndexCreator.add(forwardIndexReader.getString(i, readerContext));
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java
index 55c964b..3d5551d 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java
@@ -23,21 +23,20 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.inv.RangeIndexCreator;
import org.apache.pinot.segment.local.segment.index.loader.IndexHandler;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.RangeIndexCreatorProvider;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.creator.CombinedInvertedIndexCreator;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
-import org.apache.pinot.spi.data.FieldSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,14 +50,16 @@ public class RangeIndexHandler implements IndexHandler {
private final SegmentDirectory.Writer _segmentWriter;
private final Set<String> _columnsToAddIdx;
private final int _rangeIndexVersion;
+ private final RangeIndexCreatorProvider _indexCreatorProvider;
public RangeIndexHandler(File indexDir, SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig,
- SegmentDirectory.Writer segmentWriter) {
+ SegmentDirectory.Writer segmentWriter, RangeIndexCreatorProvider indexCreatorProvider) {
_indexDir = indexDir;
_segmentMetadata = segmentMetadata;
_segmentWriter = segmentWriter;
_columnsToAddIdx = new HashSet<>(indexLoadingConfig.getRangeIndexColumns());
_rangeIndexVersion = indexLoadingConfig.getRangeIndexVersion();
+ _indexCreatorProvider = indexCreatorProvider;
}
@Override
@@ -125,7 +126,7 @@ public class RangeIndexHandler implements IndexHandler {
int numDocs = columnMetadata.getTotalDocs();
try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
- CombinedInvertedIndexCreator rangeIndexCreator = newRangeIndexCreator(columnMetadata, FieldSpec.DataType.INT)) {
+ CombinedInvertedIndexCreator rangeIndexCreator = newRangeIndexCreator(columnMetadata)) {
if (columnMetadata.isSingleValue()) {
// Single-value column
for (int i = 0; i < numDocs; i++) {
@@ -148,8 +149,7 @@ public class RangeIndexHandler implements IndexHandler {
int numDocs = columnMetadata.getTotalDocs();
try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
- CombinedInvertedIndexCreator rangeIndexCreator = newRangeIndexCreator(columnMetadata,
- columnMetadata.getDataType())) {
+ CombinedInvertedIndexCreator rangeIndexCreator = newRangeIndexCreator(columnMetadata)) {
if (columnMetadata.isSingleValue()) {
// Single-value column.
switch (columnMetadata.getDataType()) {
@@ -216,13 +216,10 @@ public class RangeIndexHandler implements IndexHandler {
}
}
- private CombinedInvertedIndexCreator newRangeIndexCreator(ColumnMetadata columnMetadata, FieldSpec.DataType dataType)
+ private CombinedInvertedIndexCreator newRangeIndexCreator(ColumnMetadata columnMetadata)
throws IOException {
- if (_rangeIndexVersion == BitSlicedRangeIndexCreator.VERSION && columnMetadata.isSingleValue()) {
- return new BitSlicedRangeIndexCreator(_indexDir, columnMetadata);
- }
- // default to RangeIndexCreator for the time being
- return new RangeIndexCreator(_indexDir, columnMetadata.getFieldSpec(), dataType, -1, -1,
- columnMetadata.getTotalDocs(), columnMetadata.getTotalNumberOfEntries());
+ return _indexCreatorProvider.newRangeIndexCreator(
+ IndexCreationContext.builder().withIndexDir(_indexDir).withColumnMetadata(columnMetadata).build()
+ .forRangeIndex(_rangeIndexVersion, columnMetadata.getMinValue(), columnMetadata.getMaxValue()));
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java
index 19e3a91..6e7c64e 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java
@@ -40,13 +40,14 @@ import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
-import org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator;
import org.apache.pinot.segment.local.segment.index.loader.IndexHandler;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
import org.apache.pinot.segment.local.segment.index.loader.SegmentPreProcessor;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.TextIndexCreatorProvider;
import org.apache.pinot.segment.spi.index.creator.TextIndexCreator;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
@@ -85,13 +86,15 @@ public class TextIndexHandler implements IndexHandler {
private final SegmentMetadata _segmentMetadata;
private final SegmentDirectory.Writer _segmentWriter;
private final Set<String> _columnsToAddIdx;
+ private final TextIndexCreatorProvider _textIndexCreatorProvider;
public TextIndexHandler(File indexDir, SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig,
- SegmentDirectory.Writer segmentWriter) {
+ SegmentDirectory.Writer segmentWriter, TextIndexCreatorProvider textIndexCreatorProvider) {
_indexDir = indexDir;
_segmentMetadata = segmentMetadata;
_segmentWriter = segmentWriter;
_columnsToAddIdx = new HashSet<>(indexLoadingConfig.getTextIndexColumns());
+ _textIndexCreatorProvider = textIndexCreatorProvider;
}
@Override
@@ -144,7 +147,8 @@ public class TextIndexHandler implements IndexHandler {
// based on segmentVersion.
try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
- LuceneTextIndexCreator textIndexCreator = new LuceneTextIndexCreator(column, segmentDirectory, true)) {
+ TextIndexCreator textIndexCreator = _textIndexCreatorProvider.newTextIndexCreator(IndexCreationContext.builder()
+ .withColumnMetadata(columnMetadata).withIndexDir(segmentDirectory).build().forTextIndex(true))) {
if (columnMetadata.isSingleValue()) {
processSVField(hasDictionary, forwardIndexReader, readerContext, textIndexCreator, numDocs, columnMetadata);
} else {
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/IndexCreatorOverrideTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/IndexCreatorOverrideTest.java
new file mode 100644
index 0000000..6e32884
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/IndexCreatorOverrideTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.UUID;
+import org.apache.pinot.segment.local.segment.creator.impl.inv.OffHeapBitmapInvertedIndexCreator;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProviders;
+import org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator;
+import org.apache.pinot.segment.spi.index.metadata.ColumnMetadataImpl;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import static org.apache.commons.io.FileUtils.deleteQuietly;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class IndexCreatorOverrideTest {
+
+ private File _file;
+
+ @BeforeTest
+ public void before()
+ throws IOException {
+ _file = Files.createTempFile("IndexCreatorOverrideTest", UUID.randomUUID().toString()).toFile();
+ }
+
+ @AfterTest
+ public void cleanup() {
+ deleteQuietly(_file);
+ }
+
+ @Test
+ public void testOverrideInvertedIndexCreation()
+ throws IOException {
+ DictionaryBasedInvertedIndexCreator highCardinalityInvertedIndex = mock(DictionaryBasedInvertedIndexCreator.class);
+ IndexCreatorProvider provider = new IndexCreatorProviders.Default() {
+ @Override
+ public DictionaryBasedInvertedIndexCreator newInvertedIndexCreator(IndexCreationContext.Inverted context)
+ throws IOException {
+ if (context.getCardinality() >= 10000) {
+ return highCardinalityInvertedIndex;
+ }
+ return super.newInvertedIndexCreator(context);
+ }
+ };
+ mockStatic(IndexCreatorProviders.class).when(IndexCreatorProviders::getIndexCreatorProvider).thenReturn(provider);
+ IndexCreationContext.Inverted highCardinalityContext = newContext(Integer.MAX_VALUE);
+ assertEquals(IndexCreatorProviders.getIndexCreatorProvider().newInvertedIndexCreator(highCardinalityContext),
+ highCardinalityInvertedIndex);
+ IndexCreationContext.Inverted lowCardinalityContext = newContext(1);
+ assertTrue(IndexCreatorProviders.getIndexCreatorProvider()
+ .newInvertedIndexCreator(lowCardinalityContext) instanceof OffHeapBitmapInvertedIndexCreator);
+ }
+
+ private IndexCreationContext.Inverted newContext(int cardinality) {
+ FieldSpec fieldSpec = new DimensionFieldSpec("test", FieldSpec.DataType.INT, true);
+ return IndexCreationContext.builder().withIndexDir(_file)
+ .withColumnMetadata(ColumnMetadataImpl.builder().setFieldSpec(fieldSpec).setCardinality(cardinality).build())
+ .build().forInvertedIndex();
+ }
+}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/BitSlicedIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/BitSlicedIndexCreatorTest.java
index c06ebee..0ad213c 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/BitSlicedIndexCreatorTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/BitSlicedIndexCreatorTest.java
@@ -65,14 +65,12 @@ public class BitSlicedIndexCreatorTest {
@Test(expectedExceptions = IllegalArgumentException.class)
public void testFailToCreateRawString() {
- new BitSlicedRangeIndexCreator(INDEX_DIR, new ColumnMetadataImpl.Builder()
- .setFieldSpec(new DimensionFieldSpec("foo", STRING, true)).build());
+ new BitSlicedRangeIndexCreator(INDEX_DIR, new DimensionFieldSpec("foo", STRING, true), null, null);
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testFailToCreateMV() {
- new BitSlicedRangeIndexCreator(INDEX_DIR, new ColumnMetadataImpl.Builder()
- .setFieldSpec(new DimensionFieldSpec("foo", INT, false)).build());
+ new BitSlicedRangeIndexCreator(INDEX_DIR, new DimensionFieldSpec("foo", INT, false), 0, 10);
}
@Test
@@ -153,7 +151,7 @@ public class BitSlicedIndexCreatorTest {
private void testInt(Dataset<int[]> dataset)
throws IOException {
ColumnMetadata metadata = dataset.toColumnMetadata();
- try (BitSlicedRangeIndexCreator creator = new BitSlicedRangeIndexCreator(INDEX_DIR, metadata)) {
+ try (BitSlicedRangeIndexCreator creator = newBitslicedIndexCreator(metadata)) {
for (int value : dataset.values()) {
creator.add(value);
}
@@ -181,7 +179,7 @@ public class BitSlicedIndexCreatorTest {
private void testLong(Dataset<long[]> dataset)
throws IOException {
ColumnMetadata metadata = dataset.toColumnMetadata();
- try (BitSlicedRangeIndexCreator creator = new BitSlicedRangeIndexCreator(INDEX_DIR, metadata)) {
+ try (BitSlicedRangeIndexCreator creator = newBitslicedIndexCreator(metadata)) {
for (long value : dataset.values()) {
creator.add(value);
}
@@ -209,7 +207,7 @@ public class BitSlicedIndexCreatorTest {
private void testFloat(Dataset<float[]> dataset)
throws IOException {
ColumnMetadata metadata = dataset.toColumnMetadata();
- try (BitSlicedRangeIndexCreator creator = new BitSlicedRangeIndexCreator(INDEX_DIR, metadata)) {
+ try (BitSlicedRangeIndexCreator creator = newBitslicedIndexCreator(metadata)) {
for (float value : dataset.values()) {
creator.add(value);
}
@@ -237,7 +235,7 @@ public class BitSlicedIndexCreatorTest {
private void testDouble(Dataset<double[]> dataset)
throws IOException {
ColumnMetadata metadata = dataset.toColumnMetadata();
- try (BitSlicedRangeIndexCreator creator = new BitSlicedRangeIndexCreator(INDEX_DIR, metadata)) {
+ try (BitSlicedRangeIndexCreator creator = newBitslicedIndexCreator(metadata)) {
for (double value : dataset.values()) {
creator.add(value);
}
@@ -262,6 +260,12 @@ public class BitSlicedIndexCreatorTest {
}
}
+ private static BitSlicedRangeIndexCreator newBitslicedIndexCreator(ColumnMetadata metadata) {
+ return metadata.hasDictionary() ? new BitSlicedRangeIndexCreator(INDEX_DIR,
+ metadata.getFieldSpec(), metadata.getCardinality()) : new BitSlicedRangeIndexCreator(INDEX_DIR,
+ metadata.getFieldSpec(), metadata.getMinValue(), metadata.getMaxValue());
+ }
+
enum Distribution {
NORMAL {
@Override
diff --git a/pinot-segment-spi/pom.xml b/pinot-segment-spi/pom.xml
index 1af8184..89a56d9 100644
--- a/pinot-segment-spi/pom.xml
+++ b/pinot-segment-spi/pom.xml
@@ -93,5 +93,11 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/BloomFilterCreatorProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/BloomFilterCreatorProvider.java
new file mode 100644
index 0000000..ba23359
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/BloomFilterCreatorProvider.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+import java.io.IOException;
+import org.apache.pinot.segment.spi.index.creator.BloomFilterCreator;
+
+
+public interface BloomFilterCreatorProvider {
+ /**
+ * Creates a {@see BloomFilterCreator} from information about index creation.
+ * This allows a plugin to pattern match index creation information to select
+ * an appropriate implementation.
+ * @param context context about the index creation.
+ * @return a {@see ForwardIndexCreator}
+ * @throws IOException whenever something goes wrong matching or constructing the creator
+ */
+ BloomFilterCreator newBloomFilterCreator(IndexCreationContext.BloomFilter context)
+ throws IOException;
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ForwardIndexCreatorProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ForwardIndexCreatorProvider.java
new file mode 100644
index 0000000..8c15dbb
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/ForwardIndexCreatorProvider.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+
+
+public interface ForwardIndexCreatorProvider {
+ /**
+ * Creates a {@see ForwardIndexCreator} from information about index creation.
+ * This allows a plugin to pattern match index creation information to select
+ * an appropriate implementation.
+ * @param context context about the index creation.
+ * @return a {@see ForwardIndexCreator}
+ * @throws Exception whenever something goes wrong matching or constructing the creator
+ */
+ ForwardIndexCreator newForwardIndexCreator(IndexCreationContext.Forward context)
+ throws Exception;
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/GeoSpatialIndexCreatorProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/GeoSpatialIndexCreatorProvider.java
new file mode 100644
index 0000000..34341a1
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/GeoSpatialIndexCreatorProvider.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+import java.io.IOException;
+import org.apache.pinot.segment.spi.index.creator.GeoSpatialIndexCreator;
+
+
+public interface GeoSpatialIndexCreatorProvider {
+
+ /**
+ * Creates a {@see GeoSpatialIndexCreator} from information about index creation.
+ * This allows a plugin to pattern match index creation information to select
+ * an appropriate implementation.
+ * @param context context about the index creation.
+ * @return a {@see ForwardIndexCreator}
+ * @throws IOException whenever something goes wrong matching or constructing the creator
+ */
+ GeoSpatialIndexCreator newGeoSpatialIndexCreator(IndexCreationContext.Geospatial context)
+ throws IOException;
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
new file mode 100644
index 0000000..c1a20ba
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
@@ -0,0 +1,467 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+import java.io.File;
+import java.util.Map;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.creator.H3IndexConfig;
+import org.apache.pinot.spi.config.table.BloomFilterConfig;
+import org.apache.pinot.spi.config.table.FSTType;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * Provides parameters for constructing indexes via {@see IndexCreatorProvider}.
+ * The responsibility for ensuring that the correct parameters for a particular
+ * index type lies with the caller.
+ */
+public interface IndexCreationContext {
+
+ FieldSpec getFieldSpec();
+
+ File getIndexDir();
+
+ boolean isOnHeap();
+
+ int getLengthOfLongestEntry();
+
+ int getMaxNumberOfMultiValueElements();
+
+ int getMaxRowLengthInBytes();
+
+ boolean isSorted();
+
+ int getCardinality();
+
+ int getTotalNumberOfEntries();
+
+ int getTotalDocs();
+
+ boolean hasDictionary();
+
+ final class Builder {
+ private File _indexDir;
+ private int _lengthOfLongestEntry;
+ private int _maxNumberOfMultiValueElements;
+ private int _maxRowLengthInBytes;
+ private boolean _onHeap = false;
+ private FieldSpec _fieldSpec;
+ private boolean _sorted;
+ private int _cardinality;
+ private int _totalNumberOfEntries;
+ private int _totalDocs;
+ private boolean _hasDictionary = true;
+
+ public Builder withColumnIndexCreationInfo(ColumnIndexCreationInfo columnIndexCreationInfo) {
+ return withLengthOfLongestEntry(columnIndexCreationInfo.getLengthOfLongestEntry())
+ .withMaxNumberOfMultiValueElements(columnIndexCreationInfo.getMaxNumberOfMultiValueElements())
+ .withMaxRowLengthInBytes(columnIndexCreationInfo.getMaxRowLengthInBytes());
+ }
+
+ public Builder withIndexDir(File indexDir) {
+ _indexDir = indexDir;
+ return this;
+ }
+
+ public Builder onHeap(boolean onHeap) {
+ _onHeap = onHeap;
+ return this;
+ }
+
+ public Builder withColumnMetadata(ColumnMetadata columnMetadata) {
+ return withFieldSpec(columnMetadata.getFieldSpec())
+ .sorted(columnMetadata.isSorted())
+ .withCardinality(columnMetadata.getCardinality())
+ .withTotalNumberOfEntries(columnMetadata.getTotalNumberOfEntries())
+ .withTotalDocs(columnMetadata.getTotalDocs())
+ .withDictionary(columnMetadata.hasDictionary());
+ }
+
+ public Builder withLengthOfLongestEntry(int lengthOfLongestEntry) {
+ _lengthOfLongestEntry = lengthOfLongestEntry;
+ return this;
+ }
+
+ public Builder withMaxNumberOfMultiValueElements(int maxNumberOfMultiValueElements) {
+ _maxNumberOfMultiValueElements = maxNumberOfMultiValueElements;
+ return this;
+ }
+
+ public Builder withMaxRowLengthInBytes(int maxRowLengthInBytes) {
+ _maxRowLengthInBytes = maxRowLengthInBytes;
+ return this;
+ }
+
+ public Builder withFieldSpec(FieldSpec fieldSpec) {
+ _fieldSpec = fieldSpec;
+ return this;
+ }
+
+ public Builder sorted(boolean sorted) {
+ _sorted = sorted;
+ return this;
+ }
+
+ public Builder withCardinality(int cardinality) {
+ _cardinality = cardinality;
+ return this;
+ }
+
+ public Builder withTotalNumberOfEntries(int totalNumberOfEntries) {
+ _totalNumberOfEntries = totalNumberOfEntries;
+ return this;
+ }
+
+ public Builder withTotalDocs(int totalDocs) {
+ _totalDocs = totalDocs;
+ return this;
+ }
+
+ public Builder withDictionary(boolean hasDictionary) {
+ _hasDictionary = hasDictionary;
+ return this;
+ }
+
+ public Common build() {
+ return new Common(Objects.requireNonNull(_indexDir),
+ _lengthOfLongestEntry, _maxNumberOfMultiValueElements, _maxRowLengthInBytes,
+ _onHeap, Objects.requireNonNull(_fieldSpec),
+ _sorted, _cardinality, _totalNumberOfEntries, _totalDocs, _hasDictionary);
+ }
+ }
+
+ static Builder builder() {
+ return new Builder();
+ }
+
+ final class Common implements IndexCreationContext {
+
+ private final File _indexDir;
+ private final int _lengthOfLongestEntry;
+ private final int _maxNumberOfMultiValueElements;
+ private final int _maxRowLengthInBytes;
+ private final boolean _onHeap;
+ private final FieldSpec _fieldSpec;
+ private final boolean _sorted;
+ private final int _cardinality;
+ private final int _totalNumberOfEntries;
+ private final int _totalDocs;
+ private final boolean _hasDictionary;
+
+ public Common(File indexDir, int lengthOfLongestEntry,
+ int maxNumberOfMultiValueElements, int maxRowLengthInBytes, boolean onHeap,
+ FieldSpec fieldSpec, boolean sorted, int cardinality, int totalNumberOfEntries,
+ int totalDocs, boolean hasDictionary) {
+ _indexDir = indexDir;
+ _lengthOfLongestEntry = lengthOfLongestEntry;
+ _maxNumberOfMultiValueElements = maxNumberOfMultiValueElements;
+ _maxRowLengthInBytes = maxRowLengthInBytes;
+ _onHeap = onHeap;
+ _fieldSpec = fieldSpec;
+ _sorted = sorted;
+ _cardinality = cardinality;
+ _totalNumberOfEntries = totalNumberOfEntries;
+ _totalDocs = totalDocs;
+ _hasDictionary = hasDictionary;
+ }
+
+ public FieldSpec getFieldSpec() {
+ return _fieldSpec;
+ }
+
+ public File getIndexDir() {
+ return _indexDir;
+ }
+
+ public boolean isOnHeap() {
+ return _onHeap;
+ }
+
+ public int getLengthOfLongestEntry() {
+ return _lengthOfLongestEntry;
+ }
+
+ public int getMaxNumberOfMultiValueElements() {
+ return _maxNumberOfMultiValueElements;
+ }
+
+ public int getMaxRowLengthInBytes() {
+ return _maxRowLengthInBytes;
+ }
+
+ public boolean isSorted() {
+ return _sorted;
+ }
+
+ public int getCardinality() {
+ return _cardinality;
+ }
+
+ public int getTotalNumberOfEntries() {
+ return _totalNumberOfEntries;
+ }
+
+ public int getTotalDocs() {
+ return _totalDocs;
+ }
+
+ public boolean hasDictionary() {
+ return _hasDictionary;
+ }
+
+ public BloomFilter forBloomFilter(BloomFilterConfig bloomFilterConfig) {
+ return new BloomFilter(this, bloomFilterConfig);
+ }
+
+ public Forward forForwardIndex(ChunkCompressionType chunkCompressionType,
+ @Nullable Map<String, Map<String, String>> columnProperties) {
+ return new Forward(this, chunkCompressionType, columnProperties);
+ }
+
+ public Text forFSTIndex(FSTType fstType, String[] sortedUniqueElementsArray) {
+ return new Text(this, fstType, sortedUniqueElementsArray);
+ }
+
+ public Geospatial forGeospatialIndex(H3IndexConfig h3IndexConfig) {
+ return new Geospatial(this, h3IndexConfig);
+ }
+
+ public Inverted forInvertedIndex() {
+ return new Inverted(this);
+ }
+
+ public Json forJsonIndex() {
+ return new Json(this);
+ }
+
+ public Range forRangeIndex(int rangeIndexVersion, Comparable<?> min, Comparable<?> max) {
+ return new Range(this, rangeIndexVersion, min, max);
+ }
+
+ public Text forTextIndex(boolean commitOnClose) {
+ return new Text(this, commitOnClose);
+ }
+ }
+
+ class Wrapper implements IndexCreationContext {
+
+ private final IndexCreationContext _delegate;
+
+ Wrapper(IndexCreationContext delegate) {
+ _delegate = delegate;
+ }
+
+ @Override
+ public FieldSpec getFieldSpec() {
+ return _delegate.getFieldSpec();
+ }
+
+ @Override
+ public File getIndexDir() {
+ return _delegate.getIndexDir();
+ }
+
+ @Override
+ public boolean isOnHeap() {
+ return _delegate.isOnHeap();
+ }
+
+ @Override
+ public int getLengthOfLongestEntry() {
+ return _delegate.getLengthOfLongestEntry();
+ }
+
+ @Override
+ public int getMaxNumberOfMultiValueElements() {
+ return _delegate.getMaxNumberOfMultiValueElements();
+ }
+
+ @Override
+ public int getMaxRowLengthInBytes() {
+ return _delegate.getMaxRowLengthInBytes();
+ }
+
+ @Override
+ public boolean isSorted() {
+ return _delegate.isSorted();
+ }
+
+ @Override
+ public int getCardinality() {
+ return _delegate.getCardinality();
+ }
+
+ @Override
+ public int getTotalNumberOfEntries() {
+ return _delegate.getTotalNumberOfEntries();
+ }
+
+ @Override
+ public int getTotalDocs() {
+ return _delegate.getTotalDocs();
+ }
+
+ @Override
+ public boolean hasDictionary() {
+ return _delegate.hasDictionary();
+ }
+ }
+
+ class BloomFilter extends Wrapper {
+
+ private final BloomFilterConfig _bloomFilterConfig;
+
+ public BloomFilter(IndexCreationContext wrapped, BloomFilterConfig bloomFilterConfig) {
+ super(wrapped);
+ _bloomFilterConfig = bloomFilterConfig;
+ }
+
+ @Nullable
+ public BloomFilterConfig getBloomFilterConfig() {
+ return _bloomFilterConfig;
+ }
+ }
+
+ class Forward extends Wrapper {
+
+ private final ChunkCompressionType _chunkCompressionType;
+ private final Map<String, Map<String, String>> _columnProperties;
+
+ Forward(IndexCreationContext delegate,
+ ChunkCompressionType chunkCompressionType,
+ @Nullable Map<String, Map<String, String>> columnProperties) {
+ super(delegate);
+ _chunkCompressionType = chunkCompressionType;
+ _columnProperties = columnProperties;
+ }
+
+ public ChunkCompressionType getChunkCompressionType() {
+ return _chunkCompressionType;
+ }
+
+ @Nullable
+ public Map<String, Map<String, String>> getColumnProperties() {
+ return _columnProperties;
+ }
+ }
+
+ class Geospatial extends Wrapper {
+
+ private final H3IndexConfig _h3IndexConfig;
+
+ Geospatial(IndexCreationContext delegate, H3IndexConfig h3IndexConfig) {
+ super(delegate);
+ _h3IndexConfig = h3IndexConfig;
+ }
+
+ public H3IndexConfig getH3IndexConfig() {
+ return _h3IndexConfig;
+ }
+ }
+
+ class Inverted extends Wrapper {
+
+ Inverted(IndexCreationContext delegate) {
+ super(delegate);
+ }
+ }
+
+ class Json extends Wrapper {
+
+ Json(IndexCreationContext delegate) {
+ super(delegate);
+ }
+ }
+
+ class Range extends Wrapper {
+
+ private final Comparable<?> _min;
+ private final Comparable<?> _max;
+ private final int _rangeIndexVersion;
+
+
+ Range(IndexCreationContext delegate, int rangeIndexVersion, Comparable<?> min, Comparable<?> max) {
+ super(delegate);
+ _rangeIndexVersion = rangeIndexVersion;
+ _min = min;
+ _max = max;
+ }
+
+ public Comparable<?> getMin() {
+ return _min;
+ }
+
+ public Comparable<?> getMax() {
+ return _max;
+ }
+
+ public int getRangeIndexVersion() {
+ return _rangeIndexVersion;
+ }
+ }
+
+ class Text extends Wrapper {
+ private final boolean _commitOnClose;
+ private final boolean _isFst;
+ private final FSTType _fstType;
+ private final String[] _sortedUniqueElementsArray;
+
+ /**
+ * For text indexes
+ */
+ public Text(IndexCreationContext wrapped, boolean commitOnClose) {
+ super(wrapped);
+ _commitOnClose = commitOnClose;
+ _fstType = null;
+ _sortedUniqueElementsArray = null;
+ _isFst = false;
+ }
+
+ /**
+ * For FST indexes
+ */
+ public Text(IndexCreationContext wrapped, FSTType fstType, String[] sortedUniqueElementsArray) {
+ super(wrapped);
+ _commitOnClose = true;
+ _fstType = fstType;
+ _sortedUniqueElementsArray = sortedUniqueElementsArray;
+ _isFst = true;
+ }
+
+ public boolean isCommitOnClose() {
+ return _commitOnClose;
+ }
+
+ public FSTType getFstType() {
+ return _fstType;
+ }
+
+ public boolean isFst() {
+ return _isFst;
+ }
+
+ public String[] getSortedUniqueElementsArray() {
+ return _sortedUniqueElementsArray;
+ }
+ }
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreatorProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreatorProvider.java
new file mode 100644
index 0000000..4dd1a02
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreatorProvider.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+/**
+ * Plugin interface to abstract index creation.
+ */
+public interface IndexCreatorProvider
+ extends ForwardIndexCreatorProvider, InvertedIndexCreatorProvider, JsonIndexCreatorProvider,
+ TextIndexCreatorProvider, GeoSpatialIndexCreatorProvider, RangeIndexCreatorProvider,
+ BloomFilterCreatorProvider {
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreatorProviders.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreatorProviders.java
new file mode 100644
index 0000000..40466f1
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreatorProviders.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodType;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pinot.segment.spi.index.creator.BloomFilterCreator;
+import org.apache.pinot.segment.spi.index.creator.CombinedInvertedIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.GeoSpatialIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
+import org.apache.pinot.segment.spi.index.creator.TextIndexCreator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Plugin registration point to allow index creation logic to be swapped out.
+ * Plugins should not reimplement Pinot's default index creation logic.
+ * Users provide an override to Pinot's index creation logic. This is simplified
+ * by extending {@see IndexCreatorProviders.Default}
+ */
+public final class IndexCreatorProviders {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(IndexCreatorProviders.class);
+
+ private static final IndexCreatorProvider DEFAULT = defaultProvider();
+ private static final AtomicReference<IndexCreatorProvider> REGISTRATION = new AtomicReference<>(DEFAULT);
+
+ private IndexCreatorProviders() {
+ }
+
+ /**
+ * The caller provides a decorator to wrap the default provider, which allows plugins to create
+ * a delegation chain.
+ * @param provider index creation override
+ * @return true if this is the first invocation and the provider has not yet been used.
+ */
+ public static boolean registerProvider(IndexCreatorProvider provider) {
+ return REGISTRATION.compareAndSet(DEFAULT, provider);
+ }
+
+ /**
+ * Obtain the registered index creator provider. If the user has provided an override, then it will be used instead.
+ * If the user has not provided an override yet, then this action will prevent them from doing so.
+ * @return the global index provision logic.
+ */
+ public static IndexCreatorProvider getIndexCreatorProvider() {
+ return Holder.PROVIDER;
+ }
+
+ private static final class Holder {
+ public static final IndexCreatorProvider PROVIDER = REGISTRATION.get();
+ }
+
+ private static IndexCreatorProvider defaultProvider() {
+ // use MethodHandle to break circular dependency and keep implementation details encapsulated within
+ // pinot-segment-local
+ String className = "org.apache.pinot.segment.local.segment.creator.impl.DefaultIndexCreatorProvider";
+ try {
+ Class<?> clazz = Class.forName(className, false, IndexCreatorProviders.class.getClassLoader());
+ return (IndexCreatorProvider) MethodHandles.publicLookup()
+ .findConstructor(clazz, MethodType.methodType(void.class)).invoke();
+ } catch (Throwable missing) {
+ LOGGER.error("could not construct MethodHandle for {}", className, missing);
+ // this means pinot-segment-local isn't on the classpath, but this means
+ // no indexes will be created, so it's ok to return null
+ return null;
+ }
+ }
+
+ /**
+ * Extend this class to override index creation
+ */
+ public static class Default implements IndexCreatorProvider {
+
+ @Override
+ public BloomFilterCreator newBloomFilterCreator(IndexCreationContext.BloomFilter context)
+ throws IOException {
+ if (DEFAULT == null) {
+ throw new UnsupportedOperationException("default implementation not present on classpath");
+ }
+ return DEFAULT.newBloomFilterCreator(context);
+ }
+
+ @Override
+ public ForwardIndexCreator newForwardIndexCreator(IndexCreationContext.Forward context)
+ throws Exception {
+ if (DEFAULT == null) {
+ throw new UnsupportedOperationException("default implementation not present on classpath");
+ }
+ return DEFAULT.newForwardIndexCreator(context);
+ }
+
+ @Override
+ public GeoSpatialIndexCreator newGeoSpatialIndexCreator(IndexCreationContext.Geospatial context)
+ throws IOException {
+ if (DEFAULT == null) {
+ throw new UnsupportedOperationException("default implementation not present on classpath");
+ }
+ return DEFAULT.newGeoSpatialIndexCreator(context);
+ }
+
+ @Override
+ public DictionaryBasedInvertedIndexCreator newInvertedIndexCreator(IndexCreationContext.Inverted context)
+ throws IOException {
+ if (DEFAULT == null) {
+ throw new UnsupportedOperationException("default implementation not present on classpath");
+ }
+ return DEFAULT.newInvertedIndexCreator(context);
+ }
+
+ @Override
+ public JsonIndexCreator newJsonIndexCreator(IndexCreationContext.Json context)
+ throws IOException {
+ if (DEFAULT == null) {
+ throw new UnsupportedOperationException("default implementation not present on classpath");
+ }
+ return DEFAULT.newJsonIndexCreator(context);
+ }
+
+ @Override
+ public CombinedInvertedIndexCreator newRangeIndexCreator(IndexCreationContext.Range context)
+ throws IOException {
+ if (DEFAULT == null) {
+ throw new UnsupportedOperationException("default implementation not present on classpath");
+ }
+ return DEFAULT.newRangeIndexCreator(context);
+ }
+
+ @Override
+ public TextIndexCreator newTextIndexCreator(IndexCreationContext.Text context)
+ throws IOException {
+ if (DEFAULT == null) {
+ throw new UnsupportedOperationException("default implementation not present on classpath");
+ }
+ return DEFAULT.newTextIndexCreator(context);
+ }
+ }
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/InvertedIndexCreatorProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/InvertedIndexCreatorProvider.java
new file mode 100644
index 0000000..2bf7f1a
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/InvertedIndexCreatorProvider.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+import java.io.IOException;
+import org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator;
+
+
+public interface InvertedIndexCreatorProvider {
+ /**
+ * Creates a {@see DictionaryBasedInvertedIndexCreator} from information about index creation.
+ * This allows a plugin to pattern match index creation information to select
+ * an appropriate implementation.
+ * @param context context about the index creation.
+ * @return a {@see ForwardIndexCreator}
+ * @throws IOException whenever something goes wrong matching or constructing the creator
+ */
+ DictionaryBasedInvertedIndexCreator newInvertedIndexCreator(IndexCreationContext.Inverted context)
+ throws IOException;
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/JsonIndexCreatorProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/JsonIndexCreatorProvider.java
new file mode 100644
index 0000000..22e7f54
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/JsonIndexCreatorProvider.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+import java.io.IOException;
+import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
+
+
+public interface JsonIndexCreatorProvider {
+ /**
+ * Creates a {@see JsonIndexCreator} from information about index creation.
+ * This allows a plugin to pattern match index creation information to select
+ * an appropriate implementation.
+ * @param context context about the index creation.
+ * @return a {@see ForwardIndexCreator}
+ * @throws IOException whenever something goes wrong matching or constructing the creator
+ */
+ JsonIndexCreator newJsonIndexCreator(IndexCreationContext.Json context)
+ throws IOException;
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/RangeIndexCreatorProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/RangeIndexCreatorProvider.java
new file mode 100644
index 0000000..a3abfc6
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/RangeIndexCreatorProvider.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+import java.io.IOException;
+import org.apache.pinot.segment.spi.index.creator.CombinedInvertedIndexCreator;
+
+
+public interface RangeIndexCreatorProvider {
+ /**
+ * Creates a {@see CombinedInvertedIndexCreator} from information about index creation.
+ * This allows a plugin to pattern match index creation information to select
+ * an appropriate implementation.
+ * @param context context about the index creation.
+ * @return a {@see ForwardIndexCreator}
+ * @throws IOException whenever something goes wrong matching or constructing the creator
+ */
+ CombinedInvertedIndexCreator newRangeIndexCreator(IndexCreationContext.Range context)
+ throws IOException;
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/TextIndexCreatorProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/TextIndexCreatorProvider.java
new file mode 100644
index 0000000..05f53ea
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/TextIndexCreatorProvider.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+import java.io.IOException;
+import org.apache.pinot.segment.spi.index.creator.TextIndexCreator;
+
+
+public interface TextIndexCreatorProvider {
+
+ /**
+ * Creates a {@see TextIndexCreator} from information about index creation.
+ * This allows a plugin to pattern match index creation information to select
+ * an appropriate implementation.
+ * @param context context about the index creation.
+ * @return a {@see ForwardIndexCreator}
+ * @throws IOException whenever something goes wrong matching or constructing the creator
+ */
+ TextIndexCreator newTextIndexCreator(IndexCreationContext.Text context)
+ throws IOException;
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java
index 19dbb2a..519f7c1 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java
@@ -289,6 +289,10 @@ public class ColumnMetadataImpl implements ColumnMetadata {
return builder.build();
}
+ public static Builder builder() {
+ return new Builder();
+ }
+
public static class Builder {
private FieldSpec _fieldSpec;
private int _totalDocs;
diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/IndexCreatorProvidersTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/IndexCreatorProvidersTest.java
new file mode 100644
index 0000000..bfaebb3
--- /dev/null
+++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/IndexCreatorProvidersTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator;
+
+import java.io.IOException;
+import org.apache.pinot.segment.spi.index.creator.BloomFilterCreator;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class IndexCreatorProvidersTest {
+
+ @Test
+ public void indexCreatorProvidersLoadableWithoutDefaultImplementation()
+ throws IOException {
+ BloomFilterCreator mockBloomFilterCreator = mock(BloomFilterCreator.class);
+ assertTrue(IndexCreatorProviders.registerProvider(new IndexCreatorProviders.Default() {
+ @Override
+ public BloomFilterCreator newBloomFilterCreator(IndexCreationContext.BloomFilter context) {
+ return mockBloomFilterCreator;
+ }
+ }));
+ // it's ok to load external overrides without an internal implementation present, e.g. for testing
+ assertEquals(mockBloomFilterCreator, IndexCreatorProviders.getIndexCreatorProvider()
+ .newBloomFilterCreator(mock(IndexCreationContext.BloomFilter.class)));
+ }
+
+ @Test(expectedExceptions = UnsupportedOperationException.class)
+ public void whenDefaultImplementationMissingThrowUnsupportedOperationException()
+ throws IOException {
+ // the implementation is missing so no indexes will be created anyway...
+ new IndexCreatorProviders.Default().newBloomFilterCreator(mock(IndexCreationContext.BloomFilter.class));
+ }
+}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
index d349da8..0367203 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
@@ -31,7 +31,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
-import org.apache.pinot.segment.local.segment.creator.impl.SegmentColumnarIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.DefaultIndexCreatorProvider;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
@@ -314,7 +314,7 @@ public class DictionaryToRawIndexConverter {
int numDocs = segment.getSegmentMetadata().getTotalDocs();
int lengthOfLongestEntry = (storedType == DataType.STRING) ? getLengthOfLongestEntry(dictionary) : -1;
- try (ForwardIndexCreator rawIndexCreator = SegmentColumnarIndexCreator
+ try (ForwardIndexCreator rawIndexCreator = DefaultIndexCreatorProvider
.getRawIndexCreatorForSVColumn(newSegment, compressionType, column, storedType, numDocs, lengthOfLongestEntry,
false, BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
ForwardIndexReaderContext readerContext = reader.createContext()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org