You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/12/22 02:37:51 UTC
[pinot] branch master updated: Add help methods to check if segment needs reprocessing (#7925)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 29bcc82 Add help methods to check if segment needs reprocessing (#7925)
29bcc82 is described below
commit 29bcc827bf259157250d5756acbbd38a7296d569
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Tue Dec 21 18:37:30 2021 -0800
Add help methods to check if segment needs reprocessing (#7925)
---
.../immutable/ImmutableSegmentLoader.java | 20 +++
.../local/segment/index/loader/IndexHandler.java | 12 +-
.../segment/index/loader/IndexHandlerFactory.java | 35 +++---
.../segment/index/loader/SegmentPreProcessor.java | 74 ++++++++++-
.../loader/bloomfilter/BloomFilterHandler.java | 45 +++----
.../ColumnMinMaxValueGenerator.java | 26 +++-
.../defaultcolumn/BaseDefaultColumnHandler.java | 6 +
.../loader/defaultcolumn/DefaultColumnHandler.java | 7 ++
.../loader/invertedindex/FSTIndexHandler.java | 50 ++++----
.../index/loader/invertedindex/H3IndexHandler.java | 73 ++++++-----
.../loader/invertedindex/InvertedIndexHandler.java | 47 +++----
.../loader/invertedindex/JsonIndexHandler.java | 60 +++++----
.../loader/invertedindex/RangeIndexHandler.java | 66 +++++-----
.../loader/invertedindex/TextIndexHandler.java | 62 ++++-----
.../local/segment/index/loader/LoaderTest.java | 27 ++++
.../index/loader/SegmentPreProcessorTest.java | 138 +++++++++++++++++++--
.../pinot/segment/spi/store/SegmentDirectory.java | 8 +-
17 files changed, 532 insertions(+), 224 deletions(-)
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
index d1a31cb..164d86a 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
@@ -175,6 +175,26 @@ public class ImmutableSegmentLoader {
return segment;
}
+ /**
+ * Check segment directory against the table config and schema to see if any preprocessing is needed,
+ * like changing segment format, adding new indices or updating default columns.
+ */
+ public static boolean needPreprocess(SegmentDirectory segmentDirectory, IndexLoadingConfig indexLoadingConfig,
+ @Nullable Schema schema)
+ throws Exception {
+ if (needConvertSegmentFormat(indexLoadingConfig, segmentDirectory.getSegmentMetadata())) {
+ return true;
+ }
+ SegmentPreProcessor preProcessor = new SegmentPreProcessor(segmentDirectory, indexLoadingConfig, schema);
+ return preProcessor.needProcess();
+ }
+
+ private static boolean needConvertSegmentFormat(IndexLoadingConfig indexLoadingConfig,
+ SegmentMetadataImpl segmentMetadata) {
+ SegmentVersion segmentVersionToLoad = indexLoadingConfig.getSegmentVersion();
+ return segmentVersionToLoad != null && segmentVersionToLoad != segmentMetadata.getVersion();
+ }
+
private static void convertSegmentFormat(File indexDir, IndexLoadingConfig indexLoadingConfig,
SegmentMetadataImpl localSegmentMetadata)
throws Exception {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandler.java
index f0ebe8c..5b2ba47 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandler.java
@@ -18,6 +18,10 @@
*/
package org.apache.pinot.segment.local.segment.index.loader;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
+import org.apache.pinot.segment.spi.store.SegmentDirectory;
+
+
/**
* Interface for index handlers, which update the corresponding type of indices,
* like adding, removing or converting the format.
@@ -26,6 +30,12 @@ public interface IndexHandler {
/**
* Adds new indices and removes obsolete indices.
*/
- void updateIndices()
+ void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider)
throws Exception;
+
+ /**
+ * Check if there is a need to add new indices or removes obsolete indices.
+ * @return true if there is a need to update.
+ */
+ boolean needUpdateIndices(SegmentDirectory.Reader segmentReader);
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java
index d06c1ba..bc48ff9 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.segment.local.segment.index.loader;
-import java.io.File;
import org.apache.pinot.segment.local.segment.index.loader.bloomfilter.BloomFilterHandler;
import org.apache.pinot.segment.local.segment.index.loader.invertedindex.FSTIndexHandler;
import org.apache.pinot.segment.local.segment.index.loader.invertedindex.H3IndexHandler;
@@ -27,7 +26,6 @@ import org.apache.pinot.segment.local.segment.index.loader.invertedindex.JsonInd
import org.apache.pinot.segment.local.segment.index.loader.invertedindex.RangeIndexHandler;
import org.apache.pinot.segment.local.segment.index.loader.invertedindex.TextIndexHandler;
import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
-import org.apache.pinot.segment.spi.index.IndexingOverrides;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
@@ -37,31 +35,34 @@ public class IndexHandlerFactory {
private IndexHandlerFactory() {
}
- private static final IndexHandler NO_OP_HANDLER = () -> {
+ private static final IndexHandler NO_OP_HANDLER = new IndexHandler() {
+ @Override
+ public void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider) {
+ }
+
+ @Override
+ public boolean needUpdateIndices(SegmentDirectory.Reader segmentReader) {
+ return false;
+ }
};
- public static IndexHandler getIndexHandler(ColumnIndexType type, File indexDir, SegmentMetadataImpl segmentMetadata,
- IndexLoadingConfig indexLoadingConfig, SegmentDirectory.Writer segmentWriter) {
- IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider();
+ public static IndexHandler getIndexHandler(ColumnIndexType type, SegmentMetadataImpl segmentMetadata,
+ IndexLoadingConfig indexLoadingConfig) {
switch (type) {
case INVERTED_INDEX:
- return new InvertedIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter,
- indexCreatorProvider);
+ return new InvertedIndexHandler(segmentMetadata, indexLoadingConfig);
case RANGE_INDEX:
- return new RangeIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter,
- indexCreatorProvider);
+ return new RangeIndexHandler(segmentMetadata, indexLoadingConfig);
case TEXT_INDEX:
- return new TextIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter, indexCreatorProvider);
+ return new TextIndexHandler(segmentMetadata, indexLoadingConfig);
case FST_INDEX:
- return new FSTIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter,
- indexLoadingConfig.getFSTIndexType(), indexCreatorProvider);
+ return new FSTIndexHandler(segmentMetadata, indexLoadingConfig);
case JSON_INDEX:
- return new JsonIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter, indexCreatorProvider);
+ return new JsonIndexHandler(segmentMetadata, indexLoadingConfig);
case H3_INDEX:
- return new H3IndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter, indexCreatorProvider);
+ return new H3IndexHandler(segmentMetadata, indexLoadingConfig);
case BLOOM_FILTER:
- return new BloomFilterHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter,
- indexCreatorProvider);
+ return new BloomFilterHandler(segmentMetadata, indexLoadingConfig);
default:
return NO_OP_HANDLER;
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java
index 82a30a7..47b01ce 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java
@@ -31,6 +31,8 @@ import org.apache.pinot.segment.local.startree.StarTreeBuilderUtils;
import org.apache.pinot.segment.local.startree.v2.builder.MultipleTreesBuilder;
import org.apache.pinot.segment.local.startree.v2.builder.StarTreeV2BuilderConfig;
import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
+import org.apache.pinot.segment.spi.index.IndexingOverrides;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.index.startree.StarTreeV2Metadata;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
@@ -96,9 +98,10 @@ public class SegmentPreProcessor implements AutoCloseable {
}
// Update single-column indices, like inverted index, json index etc.
+ IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider();
for (ColumnIndexType type : ColumnIndexType.values()) {
- IndexHandlerFactory.getIndexHandler(type, _indexDir, _segmentMetadata, _indexLoadingConfig, segmentWriter)
- .updateIndices();
+ IndexHandlerFactory.getIndexHandler(type, _segmentMetadata, _indexLoadingConfig)
+ .updateIndices(segmentWriter, indexCreatorProvider);
}
// Create/modify/remove star-trees if required.
@@ -120,6 +123,73 @@ public class SegmentPreProcessor implements AutoCloseable {
}
}
+ /**
+ * This method checks if there is any discrepancy between the segment and current table config and schema.
+ * If so, it returns true indicating the segment needs to be reprocessed. Right now, the default columns,
+ * all types of indices and column min/max values are checked against what's set in table config and schema.
+ */
+ public boolean needProcess()
+ throws Exception {
+ if (_segmentMetadata.getTotalDocs() == 0) {
+ return false;
+ }
+ try (SegmentDirectory.Reader segmentReader = _segmentDirectory.createReader()) {
+ // Check if there is need to update default columns according to the schema.
+ if (_schema != null) {
+ DefaultColumnHandler defaultColumnHandler = DefaultColumnHandlerFactory
+ .getDefaultColumnHandler(_indexDir, _segmentMetadata, _indexLoadingConfig, _schema, null);
+ if (defaultColumnHandler.needUpdateDefaultColumns()) {
+ return true;
+ }
+ }
+ // Check if there is need to update single-column indices, like inverted index, json index etc.
+ for (ColumnIndexType type : ColumnIndexType.values()) {
+ if (IndexHandlerFactory.getIndexHandler(type, _segmentMetadata, _indexLoadingConfig)
+ .needUpdateIndices(segmentReader)) {
+ return true;
+ }
+ }
+ // Check if there is need to create/modify/remove star-trees.
+ if (needProcessStarTrees()) {
+ return true;
+ }
+ // Check if there is need to update column min max value.
+ if (needUpdateColumnMinMaxValue()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean needUpdateColumnMinMaxValue() {
+ ColumnMinMaxValueGeneratorMode columnMinMaxValueGeneratorMode =
+ _indexLoadingConfig.getColumnMinMaxValueGeneratorMode();
+ if (columnMinMaxValueGeneratorMode == ColumnMinMaxValueGeneratorMode.NONE) {
+ return false;
+ }
+ ColumnMinMaxValueGenerator columnMinMaxValueGenerator =
+ new ColumnMinMaxValueGenerator(_segmentMetadata, null, columnMinMaxValueGeneratorMode);
+ return columnMinMaxValueGenerator.needAddColumnMinMaxValue();
+ }
+
+ private boolean needProcessStarTrees() {
+ // Check if there is need to create/modify/remove star-trees.
+ if (!_indexLoadingConfig.isEnableDynamicStarTreeCreation()) {
+ return false;
+ }
+ List<StarTreeV2BuilderConfig> starTreeBuilderConfigs = StarTreeBuilderUtils
+ .generateBuilderConfigs(_indexLoadingConfig.getStarTreeIndexConfigs(),
+ _indexLoadingConfig.isEnableDefaultStarTree(), _segmentMetadata);
+ List<StarTreeV2Metadata> starTreeMetadataList = _segmentMetadata.getStarTreeV2MetadataList();
+ // There are existing star-trees, but if they match the builder configs exactly,
+ // then there is no need to generate the star-trees
+ if (starTreeMetadataList != null && !StarTreeBuilderUtils
+ .shouldRemoveExistingStarTrees(starTreeBuilderConfigs, starTreeMetadataList)) {
+ return false;
+ }
+ return !starTreeBuilderConfigs.isEmpty();
+ }
+
private void processStarTrees()
throws Exception {
// Create/modify/remove star-trees if required
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java
index f992344..065f693 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/bloomfilter/BloomFilterHandler.java
@@ -35,12 +35,13 @@ import org.apache.pinot.segment.local.segment.index.readers.IntDictionary;
import org.apache.pinot.segment.local.segment.index.readers.LongDictionary;
import org.apache.pinot.segment.local.segment.index.readers.StringDictionary;
import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.creator.BloomFilterCreatorProvider;
import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.creator.BloomFilterCreator;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
@@ -54,32 +55,32 @@ import org.slf4j.LoggerFactory;
public class BloomFilterHandler implements IndexHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(BloomFilterHandler.class);
- private final File _indexDir;
- private final SegmentMetadataImpl _segmentMetadata;
- private final SegmentDirectory.Writer _segmentWriter;
+ private final SegmentMetadata _segmentMetadata;
private final Map<String, BloomFilterConfig> _bloomFilterConfigs;
- private final BloomFilterCreatorProvider _indexCreatorProvider;
- public BloomFilterHandler(File indexDir, SegmentMetadataImpl segmentMetadata, IndexLoadingConfig indexLoadingConfig,
- SegmentDirectory.Writer segmentWriter, BloomFilterCreatorProvider indexCreatorProvider) {
- _indexDir = indexDir;
- _segmentWriter = segmentWriter;
+ public BloomFilterHandler(SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig) {
_segmentMetadata = segmentMetadata;
_bloomFilterConfigs = indexLoadingConfig.getBloomFilterConfigs();
- _indexCreatorProvider = indexCreatorProvider;
}
@Override
- public void updateIndices()
+ public boolean needUpdateIndices(SegmentDirectory.Reader segmentReader) {
+ Set<String> columnsToAddBF = new HashSet<>(_bloomFilterConfigs.keySet());
+ Set<String> existingColumns = segmentReader.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.BLOOM_FILTER);
+ return !existingColumns.equals(columnsToAddBF);
+ }
+
+ @Override
+ public void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider)
throws Exception {
Set<String> columnsToAddBF = new HashSet<>(_bloomFilterConfigs.keySet());
// Remove indices not set in table config any more.
String segmentName = _segmentMetadata.getName();
- Set<String> existingColumns = _segmentWriter.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.BLOOM_FILTER);
+ Set<String> existingColumns = segmentWriter.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.BLOOM_FILTER);
for (String column : existingColumns) {
if (!columnsToAddBF.remove(column)) {
LOGGER.info("Removing existing bloom filter from segment: {}, column: {}", segmentName, column);
- _segmentWriter.removeIndex(column, ColumnIndexType.BLOOM_FILTER);
+ segmentWriter.removeIndex(column, ColumnIndexType.BLOOM_FILTER);
LOGGER.info("Removed existing bloom filter from segment: {}, column: {}", segmentName, column);
}
}
@@ -87,19 +88,21 @@ public class BloomFilterHandler implements IndexHandler {
ColumnMetadata columnMetadata = _segmentMetadata.getColumnMetadataFor(column);
if (columnMetadata != null) {
if (columnMetadata.hasDictionary()) {
- createBloomFilterForColumn(columnMetadata);
+ createBloomFilterForColumn(segmentWriter, columnMetadata, indexCreatorProvider);
}
// TODO: Support raw index
}
}
}
- private void createBloomFilterForColumn(ColumnMetadata columnMetadata)
+ private void createBloomFilterForColumn(SegmentDirectory.Writer segmentWriter, ColumnMetadata columnMetadata,
+ BloomFilterCreatorProvider indexCreatorProvider)
throws Exception {
+ File indexDir = _segmentMetadata.getIndexDir();
String segmentName = _segmentMetadata.getName();
String columnName = columnMetadata.getColumnName();
- File bloomFilterFileInProgress = new File(_indexDir, columnName + ".bloom.inprogress");
- File bloomFilterFile = new File(_indexDir, columnName + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION);
+ File bloomFilterFileInProgress = new File(indexDir, columnName + ".bloom.inprogress");
+ File bloomFilterFile = new File(indexDir, columnName + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION);
if (!bloomFilterFileInProgress.exists()) {
// Marker file does not exist, which means last run ended normally.
@@ -115,10 +118,10 @@ public class BloomFilterHandler implements IndexHandler {
BloomFilterConfig bloomFilterConfig = _bloomFilterConfigs.get(columnName);
LOGGER.info("Creating new bloom filter for segment: {}, column: {} with config: {}", segmentName, columnName,
bloomFilterConfig);
- try (BloomFilterCreator bloomFilterCreator = _indexCreatorProvider.newBloomFilterCreator(
- IndexCreationContext.builder().withIndexDir(_indexDir).withColumnMetadata(columnMetadata)
+ try (BloomFilterCreator bloomFilterCreator = indexCreatorProvider.newBloomFilterCreator(
+ IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata)
.build().forBloomFilter(bloomFilterConfig));
- Dictionary dictionary = getDictionaryReader(columnMetadata, _segmentWriter)) {
+ Dictionary dictionary = getDictionaryReader(columnMetadata, segmentWriter)) {
int length = dictionary.length();
for (int i = 0; i < length; i++) {
bloomFilterCreator.add(dictionary.getStringValue(i));
@@ -128,7 +131,7 @@ public class BloomFilterHandler implements IndexHandler {
// For v3, write the generated bloom filter file into the single file and remove it.
if (_segmentMetadata.getVersion() == SegmentVersion.v3) {
- LoaderUtils.writeIndexToV3Format(_segmentWriter, columnName, bloomFilterFile, ColumnIndexType.BLOOM_FILTER);
+ LoaderUtils.writeIndexToV3Format(segmentWriter, columnName, bloomFilterFile, ColumnIndexType.BLOOM_FILTER);
}
// Delete the marker file.
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
index da42aab..36cf3fd 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/columnminmaxvalue/ColumnMinMaxValueGenerator.java
@@ -56,10 +56,25 @@ public class ColumnMinMaxValueGenerator {
_columnMinMaxValueGeneratorMode = columnMinMaxValueGeneratorMode;
}
+ public boolean needAddColumnMinMaxValue() {
+ for (String column : getColumnsToAddMinMaxValue()) {
+ if (needAddColumnMinMaxValueForColumn(column)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
public void addColumnMinMaxValue()
throws Exception {
Preconditions.checkState(_columnMinMaxValueGeneratorMode != ColumnMinMaxValueGeneratorMode.NONE);
+ for (String column : getColumnsToAddMinMaxValue()) {
+ addColumnMinMaxValueForColumn(column);
+ }
+ saveMetadata();
+ }
+ private Set<String> getColumnsToAddMinMaxValue() {
Schema schema = _segmentMetadata.getSchema();
Set<String> columnsToAddMinMaxValue = new HashSet<>(schema.getPhysicalColumnNames());
@@ -77,10 +92,13 @@ public class ColumnMinMaxValueGenerator {
default:
break;
}
- for (String column : columnsToAddMinMaxValue) {
- addColumnMinMaxValueForColumn(column);
- }
- saveMetadata();
+ return columnsToAddMinMaxValue;
+ }
+
+ private boolean needAddColumnMinMaxValueForColumn(String columnName) {
+ ColumnMetadata columnMetadata = _segmentMetadata.getColumnMetadataFor(columnName);
+ return columnMetadata.hasDictionary() && columnMetadata.getMinValue() == null
+ && columnMetadata.getMaxValue() == null;
}
private void addColumnMinMaxValueForColumn(String columnName)
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index 6e791cc..45c0554 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -126,6 +126,12 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
_segmentProperties = SegmentMetadataImpl.getPropertiesConfiguration(indexDir);
}
+ @Override
+ public boolean needUpdateDefaultColumns() {
+ Map<String, DefaultColumnAction> defaultColumnActionMap = computeDefaultColumnActionMap();
+ return !defaultColumnActionMap.isEmpty();
+ }
+
/**
* {@inheritDoc}
*/
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/DefaultColumnHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/DefaultColumnHandler.java
index 9d1baae..88db502 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/DefaultColumnHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/DefaultColumnHandler.java
@@ -28,4 +28,11 @@ public interface DefaultColumnHandler {
*/
void updateDefaultColumns()
throws Exception;
+
+ /**
+ * Check if there is a need to add/remove/update the auto-generated default columns
+ * for the segment, according to the current table schema.
+ * @return true if there is a need to update.
+ */
+ boolean needUpdateDefaultColumns();
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/FSTIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/FSTIndexHandler.java
index 292bf0c..c594f87 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/FSTIndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/FSTIndexHandler.java
@@ -31,6 +31,7 @@ import org.apache.pinot.segment.local.segment.index.loader.SegmentPreProcessor;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.creator.TextIndexCreatorProvider;
import org.apache.pinot.segment.spi.index.creator.TextIndexCreator;
@@ -66,33 +67,32 @@ import static org.apache.pinot.segment.spi.V1Constants.Indexes.FST_INDEX_FILE_EX
public class FSTIndexHandler implements IndexHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(FSTIndexHandler.class);
- private final File _indexDir;
private final SegmentMetadata _segmentMetadata;
- private final SegmentDirectory.Writer _segmentWriter;
private final Set<String> _columnsToAddIdx;
private final FSTType _fstType;
- private final TextIndexCreatorProvider _indexCreatorProvider;
- public FSTIndexHandler(File indexDir, SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig,
- SegmentDirectory.Writer segmentWriter, FSTType fstType, TextIndexCreatorProvider indexCreatorProvider) {
- _indexDir = indexDir;
+ public FSTIndexHandler(SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig) {
_segmentMetadata = segmentMetadata;
- _segmentWriter = segmentWriter;
+ _fstType = indexLoadingConfig.getFSTIndexType();
_columnsToAddIdx = new HashSet<>(indexLoadingConfig.getFSTIndexColumns());
- _fstType = fstType;
- _indexCreatorProvider = indexCreatorProvider;
}
@Override
- public void updateIndices()
+ public boolean needUpdateIndices(SegmentDirectory.Reader segmentReader) {
+ Set<String> existingColumns = segmentReader.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.FST_INDEX);
+ return !existingColumns.equals(_columnsToAddIdx);
+ }
+
+ @Override
+ public void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider)
throws Exception {
// Remove indices not set in table config any more
String segmentName = _segmentMetadata.getName();
- Set<String> existingColumns = _segmentWriter.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.FST_INDEX);
+ Set<String> existingColumns = segmentWriter.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.FST_INDEX);
for (String column : existingColumns) {
if (!_columnsToAddIdx.remove(column)) {
LOGGER.info("Removing existing FST index from segment: {}, column: {}", segmentName, column);
- _segmentWriter.removeIndex(column, ColumnIndexType.FST_INDEX);
+ segmentWriter.removeIndex(column, ColumnIndexType.FST_INDEX);
LOGGER.info("Removed existing FST index from segment: {}, column: {}", segmentName, column);
}
}
@@ -100,7 +100,7 @@ public class FSTIndexHandler implements IndexHandler {
ColumnMetadata columnMetadata = _segmentMetadata.getColumnMetadataFor(column);
if (columnMetadata != null) {
checkUnsupportedOperationsForFSTIndex(columnMetadata);
- createFSTIndexForColumn(columnMetadata);
+ createFSTIndexForColumn(segmentWriter, columnMetadata, indexCreatorProvider);
}
}
}
@@ -121,12 +121,14 @@ public class FSTIndexHandler implements IndexHandler {
}
}
- private void createFSTIndexForColumn(ColumnMetadata columnMetadata)
+ private void createFSTIndexForColumn(SegmentDirectory.Writer segmentWriter, ColumnMetadata columnMetadata,
+ TextIndexCreatorProvider indexCreatorProvider)
throws IOException {
+ File indexDir = _segmentMetadata.getIndexDir();
String segmentName = _segmentMetadata.getName();
- String column = columnMetadata.getColumnName();
- File inProgress = new File(_indexDir, column + ".fst.inprogress");
- File fstIndexFile = new File(_indexDir, column + FST_INDEX_FILE_EXTENSION);
+ String columnName = columnMetadata.getColumnName();
+ File inProgress = new File(indexDir, columnName + ".fst.inprogress");
+ File fstIndexFile = new File(indexDir, columnName + FST_INDEX_FILE_EXTENSION);
if (!inProgress.exists()) {
// Create a marker file.
@@ -135,14 +137,14 @@ public class FSTIndexHandler implements IndexHandler {
FileUtils.deleteQuietly(fstIndexFile);
}
- LOGGER.info("Creating new FST index for column: {} in segment: {}, cardinality: {}", column, segmentName,
+ LOGGER.info("Creating new FST index for column: {} in segment: {}, cardinality: {}", columnName, segmentName,
columnMetadata.getCardinality());
- TextIndexCreator fstIndexCreator = _indexCreatorProvider.newTextIndexCreator(
- IndexCreationContext.builder().withIndexDir(_indexDir).withColumnMetadata(columnMetadata)
- .build().forFSTIndex(_fstType, null));
+ TextIndexCreator fstIndexCreator = indexCreatorProvider.newTextIndexCreator(
+ IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata).build()
+ .forFSTIndex(_fstType, null));
- try (Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter, columnMetadata)) {
+ try (Dictionary dictionary = LoaderUtils.getDictionary(segmentWriter, columnMetadata)) {
for (int dictId = 0; dictId < dictionary.length(); dictId++) {
fstIndexCreator.add(dictionary.getStringValue(dictId));
}
@@ -151,11 +153,11 @@ public class FSTIndexHandler implements IndexHandler {
// For v3, write the generated range index file into the single file and remove it.
if (_segmentMetadata.getVersion() == SegmentVersion.v3) {
- LoaderUtils.writeIndexToV3Format(_segmentWriter, column, fstIndexFile, ColumnIndexType.FST_INDEX);
+ LoaderUtils.writeIndexToV3Format(segmentWriter, columnName, fstIndexFile, ColumnIndexType.FST_INDEX);
}
// Delete the marker file.
FileUtils.deleteQuietly(inProgress);
- LOGGER.info("Created FST index for segment: {}, column: {}", segmentName, column);
+ LOGGER.info("Created FST index for segment: {}, column: {}", segmentName, columnName);
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java
index d2fceaa..e58ccef 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/H3IndexHandler.java
@@ -25,19 +25,19 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.segment.local.segment.creator.impl.inv.geospatial.OffHeapH3IndexCreator;
import org.apache.pinot.segment.local.segment.index.loader.IndexHandler;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
import org.apache.pinot.segment.local.utils.GeometrySerializer;
import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.creator.GeoSpatialIndexCreatorProvider;
import org.apache.pinot.segment.spi.creator.IndexCreationContext;
import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.creator.GeoSpatialIndexCreator;
import org.apache.pinot.segment.spi.index.creator.H3IndexConfig;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
@@ -52,49 +52,51 @@ import org.slf4j.LoggerFactory;
public class H3IndexHandler implements IndexHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(H3IndexHandler.class);
- private final File _indexDir;
- private final SegmentMetadataImpl _segmentMetadata;
- private final SegmentDirectory.Writer _segmentWriter;
+ private final SegmentMetadata _segmentMetadata;
private final Map<String, H3IndexConfig> _h3Configs;
- private final IndexCreatorProvider _indexCreatorProvider;
- public H3IndexHandler(File indexDir, SegmentMetadataImpl segmentMetadata, IndexLoadingConfig indexLoadingConfig,
- SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider) {
- _indexDir = indexDir;
+ public H3IndexHandler(SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig) {
_segmentMetadata = segmentMetadata;
- _segmentWriter = segmentWriter;
_h3Configs = indexLoadingConfig.getH3IndexConfigs();
- _indexCreatorProvider = indexCreatorProvider;
}
@Override
- public void updateIndices()
+ public boolean needUpdateIndices(SegmentDirectory.Reader segmentReader) {
+ Set<String> columnsToAddIdx = new HashSet<>(_h3Configs.keySet());
+ Set<String> existingColumns = segmentReader.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.H3_INDEX);
+ return !existingColumns.equals(columnsToAddIdx);
+ }
+
+ @Override
+ public void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider)
throws Exception {
Set<String> columnsToAddIdx = new HashSet<>(_h3Configs.keySet());
// Remove indices not set in table config any more
String segmentName = _segmentMetadata.getName();
- Set<String> existingColumns = _segmentWriter.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.H3_INDEX);
+ Set<String> existingColumns = segmentWriter.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.H3_INDEX);
for (String column : existingColumns) {
if (!columnsToAddIdx.remove(column)) {
LOGGER.info("Removing existing H3 index from segment: {}, column: {}", segmentName, column);
- _segmentWriter.removeIndex(column, ColumnIndexType.H3_INDEX);
+ segmentWriter.removeIndex(column, ColumnIndexType.H3_INDEX);
LOGGER.info("Removed existing H3 index from segment: {}, column: {}", segmentName, column);
}
}
for (String column : columnsToAddIdx) {
ColumnMetadata columnMetadata = _segmentMetadata.getColumnMetadataFor(column);
if (columnMetadata != null) {
- createH3IndexForColumn(columnMetadata);
+ createH3IndexForColumn(segmentWriter, columnMetadata, indexCreatorProvider);
}
}
}
- private void createH3IndexForColumn(ColumnMetadata columnMetadata)
+ private void createH3IndexForColumn(SegmentDirectory.Writer segmentWriter, ColumnMetadata columnMetadata,
+ GeoSpatialIndexCreatorProvider indexCreatorProvider)
throws Exception {
+ File indexDir = _segmentMetadata.getIndexDir();
String segmentName = _segmentMetadata.getName();
- String column = columnMetadata.getColumnName();
- File inProgress = new File(_indexDir, column + V1Constants.Indexes.H3_INDEX_FILE_EXTENSION + ".inprogress");
- File h3IndexFile = new File(_indexDir, column + V1Constants.Indexes.H3_INDEX_FILE_EXTENSION);
+ String columnName = columnMetadata.getColumnName();
+ File inProgress = new File(indexDir, columnName + V1Constants.Indexes.H3_INDEX_FILE_EXTENSION + ".inprogress");
+ File h3IndexFile = new File(indexDir, columnName + V1Constants.Indexes.H3_INDEX_FILE_EXTENSION);
if (!inProgress.exists()) {
// Marker file does not exist, which means last run ended normally.
@@ -108,34 +110,36 @@ public class H3IndexHandler implements IndexHandler {
}
// Create new H3 index for the column.
- LOGGER.info("Creating new H3 index for segment: {}, column: {}", segmentName, column);
+ LOGGER.info("Creating new H3 index for segment: {}, column: {}", segmentName, columnName);
Preconditions
.checkState(columnMetadata.getDataType() == DataType.BYTES, "H3 index can only be applied to BYTES columns");
if (columnMetadata.hasDictionary()) {
- handleDictionaryBasedColumn(columnMetadata);
+ handleDictionaryBasedColumn(segmentWriter, columnMetadata, indexCreatorProvider);
} else {
- handleNonDictionaryBasedColumn(columnMetadata);
+ handleNonDictionaryBasedColumn(segmentWriter, columnMetadata, indexCreatorProvider);
}
// For v3, write the generated H3 index file into the single file and remove it.
if (_segmentMetadata.getVersion() == SegmentVersion.v3) {
- LoaderUtils.writeIndexToV3Format(_segmentWriter, column, h3IndexFile, ColumnIndexType.H3_INDEX);
+ LoaderUtils.writeIndexToV3Format(segmentWriter, columnName, h3IndexFile, ColumnIndexType.H3_INDEX);
}
// Delete the marker file.
FileUtils.deleteQuietly(inProgress);
- LOGGER.info("Created H3 index for segment: {}, column: {}", segmentName, column);
+ LOGGER.info("Created H3 index for segment: {}, column: {}", segmentName, columnName);
}
- private void handleDictionaryBasedColumn(ColumnMetadata columnMetadata)
+ private void handleDictionaryBasedColumn(SegmentDirectory.Writer segmentWriter, ColumnMetadata columnMetadata,
+ GeoSpatialIndexCreatorProvider indexCreatorProvider)
throws IOException {
+ File indexDir = _segmentMetadata.getIndexDir();
String columnName = columnMetadata.getColumnName();
- try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
+ try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(segmentWriter, columnMetadata);
ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
- Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter, columnMetadata);
- GeoSpatialIndexCreator h3IndexCreator = _indexCreatorProvider.newGeoSpatialIndexCreator(
- IndexCreationContext.builder().withIndexDir(_indexDir).withColumnMetadata(columnMetadata)
+ Dictionary dictionary = LoaderUtils.getDictionary(segmentWriter, columnMetadata);
+ GeoSpatialIndexCreator h3IndexCreator = indexCreatorProvider.newGeoSpatialIndexCreator(
+ IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata)
.build().forGeospatialIndex(_h3Configs.get(columnName)))) {
int numDocs = columnMetadata.getTotalDocs();
for (int i = 0; i < numDocs; i++) {
@@ -146,13 +150,16 @@ public class H3IndexHandler implements IndexHandler {
}
}
- private void handleNonDictionaryBasedColumn(ColumnMetadata columnMetadata)
+ private void handleNonDictionaryBasedColumn(SegmentDirectory.Writer segmentWriter, ColumnMetadata columnMetadata,
+ GeoSpatialIndexCreatorProvider indexCreatorProvider)
throws Exception {
+ File indexDir = _segmentMetadata.getIndexDir();
String columnName = columnMetadata.getColumnName();
- try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
+ try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(segmentWriter, columnMetadata);
ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
- OffHeapH3IndexCreator h3IndexCreator = new OffHeapH3IndexCreator(_indexDir, columnName,
- _h3Configs.get(columnName).getResolution())) {
+ GeoSpatialIndexCreator h3IndexCreator = indexCreatorProvider.newGeoSpatialIndexCreator(
+ IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata)
+ .build().forGeospatialIndex(_h3Configs.get(columnName)))) {
int numDocs = columnMetadata.getTotalDocs();
for (int i = 0; i < numDocs; i++) {
h3IndexCreator.add(GeometrySerializer.deserialize(forwardIndexReader.getBytes(i, readerContext)));
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java
index 693ca32..91d3ae7 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/InvertedIndexHandler.java
@@ -30,6 +30,7 @@ import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
import org.apache.pinot.segment.spi.creator.InvertedIndexCreatorProvider;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator;
@@ -45,32 +46,32 @@ import org.slf4j.LoggerFactory;
public class InvertedIndexHandler implements IndexHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(InvertedIndexHandler.class);
- private final File _indexDir;
private final SegmentMetadata _segmentMetadata;
- private final SegmentDirectory.Writer _segmentWriter;
private final HashSet<String> _columnsToAddIdx;
- private final InvertedIndexCreatorProvider _indexCreatorProvider;
- public InvertedIndexHandler(File indexDir, SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig,
- SegmentDirectory.Writer segmentWriter, InvertedIndexCreatorProvider indexCreatorProvider) {
- _indexDir = indexDir;
+ public InvertedIndexHandler(SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig) {
_segmentMetadata = segmentMetadata;
- _segmentWriter = segmentWriter;
_columnsToAddIdx = new HashSet<>(indexLoadingConfig.getInvertedIndexColumns());
- _indexCreatorProvider = indexCreatorProvider;
}
@Override
- public void updateIndices()
+ public boolean needUpdateIndices(SegmentDirectory.Reader segmentReader) {
+ Set<String> existingColumns =
+ segmentReader.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.INVERTED_INDEX);
+ return !existingColumns.equals(_columnsToAddIdx);
+ }
+
+ @Override
+ public void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider)
throws IOException {
// Remove indices not set in table config any more.
String segmentName = _segmentMetadata.getName();
Set<String> existingColumns =
- _segmentWriter.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.INVERTED_INDEX);
+ segmentWriter.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.INVERTED_INDEX);
for (String column : existingColumns) {
if (!_columnsToAddIdx.remove(column)) {
LOGGER.info("Removing existing inverted index from segment: {}, column: {}", segmentName, column);
- _segmentWriter.removeIndex(column, ColumnIndexType.INVERTED_INDEX);
+ segmentWriter.removeIndex(column, ColumnIndexType.INVERTED_INDEX);
LOGGER.info("Removed existing inverted index from segment: {}, column: {}", segmentName, column);
}
}
@@ -78,17 +79,19 @@ public class InvertedIndexHandler implements IndexHandler {
ColumnMetadata columnMetadata = _segmentMetadata.getColumnMetadataFor(column);
// Only create inverted index on dictionary-encoded unsorted columns.
if (columnMetadata != null && !columnMetadata.isSorted() && columnMetadata.hasDictionary()) {
- createInvertedIndexForColumn(columnMetadata);
+ createInvertedIndexForColumn(segmentWriter, columnMetadata, indexCreatorProvider);
}
}
}
- private void createInvertedIndexForColumn(ColumnMetadata columnMetadata)
+ private void createInvertedIndexForColumn(SegmentDirectory.Writer segmentWriter, ColumnMetadata columnMetadata,
+ InvertedIndexCreatorProvider indexCreatorProvider)
throws IOException {
+ File indexDir = _segmentMetadata.getIndexDir();
String segmentName = _segmentMetadata.getName();
- String column = columnMetadata.getColumnName();
- File inProgress = new File(_indexDir, column + ".inv.inprogress");
- File invertedIndexFile = new File(_indexDir, column + V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION);
+ String columnName = columnMetadata.getColumnName();
+ File inProgress = new File(indexDir, columnName + ".inv.inprogress");
+ File invertedIndexFile = new File(indexDir, columnName + V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION);
if (!inProgress.exists()) {
// Marker file does not exist, which means last run ended normally.
@@ -102,12 +105,12 @@ public class InvertedIndexHandler implements IndexHandler {
}
// Create new inverted index for the column.
- LOGGER.info("Creating new inverted index for segment: {}, column: {}", segmentName, column);
+ LOGGER.info("Creating new inverted index for segment: {}, column: {}", segmentName, columnName);
int numDocs = columnMetadata.getTotalDocs();
- try (DictionaryBasedInvertedIndexCreator creator = _indexCreatorProvider.newInvertedIndexCreator(
- IndexCreationContext.builder().withIndexDir(_indexDir).withColumnMetadata(columnMetadata).build()
+ try (DictionaryBasedInvertedIndexCreator creator = indexCreatorProvider.newInvertedIndexCreator(
+ IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata).build()
.forInvertedIndex())) {
- try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
+ try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(segmentWriter, columnMetadata);
ForwardIndexReaderContext readerContext = forwardIndexReader.createContext()) {
if (columnMetadata.isSingleValue()) {
// Single-value column.
@@ -128,12 +131,12 @@ public class InvertedIndexHandler implements IndexHandler {
// For v3, write the generated inverted index file into the single file and remove it.
if (_segmentMetadata.getVersion() == SegmentVersion.v3) {
- LoaderUtils.writeIndexToV3Format(_segmentWriter, column, invertedIndexFile, ColumnIndexType.INVERTED_INDEX);
+ LoaderUtils.writeIndexToV3Format(segmentWriter, columnName, invertedIndexFile, ColumnIndexType.INVERTED_INDEX);
}
// Delete the marker file.
FileUtils.deleteQuietly(inProgress);
- LOGGER.info("Created inverted index for segment: {}, column: {}", segmentName, column);
+ LOGGER.info("Created inverted index for segment: {}, column: {}", segmentName, columnName);
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java
index 27b8ade..c114247 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/JsonIndexHandler.java
@@ -31,6 +31,7 @@ import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
import org.apache.pinot.segment.spi.creator.JsonIndexCreatorProvider;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator;
@@ -48,48 +49,49 @@ import org.slf4j.LoggerFactory;
public class JsonIndexHandler implements IndexHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(JsonIndexHandler.class);
- private final File _indexDir;
private final SegmentMetadata _segmentMetadata;
- private final SegmentDirectory.Writer _segmentWriter;
private final HashSet<String> _columnsToAddIdx;
- private final JsonIndexCreatorProvider _indexCreatorProvider;
- public JsonIndexHandler(File indexDir, SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig,
- SegmentDirectory.Writer segmentWriter, JsonIndexCreatorProvider indexCreatorProvider) {
- _indexDir = indexDir;
+ public JsonIndexHandler(SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig) {
_segmentMetadata = segmentMetadata;
- _segmentWriter = segmentWriter;
_columnsToAddIdx = new HashSet<>(indexLoadingConfig.getJsonIndexColumns());
- _indexCreatorProvider = indexCreatorProvider;
}
@Override
- public void updateIndices()
+ public boolean needUpdateIndices(SegmentDirectory.Reader segmentReader) {
+ Set<String> existingColumns = segmentReader.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.JSON_INDEX);
+ return !existingColumns.equals(_columnsToAddIdx);
+ }
+
+ @Override
+ public void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider)
throws Exception {
// Remove indices not set in table config any more
String segmentName = _segmentMetadata.getName();
- Set<String> existingColumns = _segmentWriter.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.JSON_INDEX);
+ Set<String> existingColumns = segmentWriter.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.JSON_INDEX);
for (String column : existingColumns) {
if (!_columnsToAddIdx.remove(column)) {
LOGGER.info("Removing existing json index from segment: {}, column: {}", segmentName, column);
- _segmentWriter.removeIndex(column, ColumnIndexType.JSON_INDEX);
+ segmentWriter.removeIndex(column, ColumnIndexType.JSON_INDEX);
LOGGER.info("Removed existing json index from segment: {}, column: {}", segmentName, column);
}
}
for (String column : _columnsToAddIdx) {
ColumnMetadata columnMetadata = _segmentMetadata.getColumnMetadataFor(column);
if (columnMetadata != null) {
- createJsonIndexForColumn(columnMetadata);
+ createJsonIndexForColumn(segmentWriter, columnMetadata, indexCreatorProvider);
}
}
}
- private void createJsonIndexForColumn(ColumnMetadata columnMetadata)
+ private void createJsonIndexForColumn(SegmentDirectory.Writer segmentWriter, ColumnMetadata columnMetadata,
+ JsonIndexCreatorProvider indexCreatorProvider)
throws Exception {
+ File indexDir = _segmentMetadata.getIndexDir();
String segmentName = _segmentMetadata.getName();
String columnName = columnMetadata.getColumnName();
- File inProgress = new File(_indexDir, columnName + V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION + ".inprogress");
- File jsonIndexFile = new File(_indexDir, columnName + V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION);
+ File inProgress = new File(indexDir, columnName + V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION + ".inprogress");
+ File jsonIndexFile = new File(indexDir, columnName + V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION);
if (!inProgress.exists()) {
// Marker file does not exist, which means last run ended normally.
@@ -108,14 +110,14 @@ public class JsonIndexHandler implements IndexHandler {
|| columnMetadata.getDataType() == DataType.JSON),
"Json index can only be applied to single-value STRING or JSON columns");
if (columnMetadata.hasDictionary()) {
- handleDictionaryBasedColumn(columnMetadata);
+ handleDictionaryBasedColumn(segmentWriter, columnMetadata, indexCreatorProvider);
} else {
- handleNonDictionaryBasedColumn(columnMetadata);
+ handleNonDictionaryBasedColumn(segmentWriter, columnMetadata, indexCreatorProvider);
}
// For v3, write the generated json index file into the single file and remove it.
if (_segmentMetadata.getVersion() == SegmentVersion.v3) {
- LoaderUtils.writeIndexToV3Format(_segmentWriter, columnName, jsonIndexFile, ColumnIndexType.JSON_INDEX);
+ LoaderUtils.writeIndexToV3Format(segmentWriter, columnName, jsonIndexFile, ColumnIndexType.JSON_INDEX);
}
// Delete the marker file.
@@ -124,13 +126,15 @@ public class JsonIndexHandler implements IndexHandler {
LOGGER.info("Created json index for segment: {}, column: {}", segmentName, columnName);
}
- private void handleDictionaryBasedColumn(ColumnMetadata columnMetadata)
+ private void handleDictionaryBasedColumn(SegmentDirectory.Writer segmentWriter, ColumnMetadata columnMetadata,
+ JsonIndexCreatorProvider indexCreatorProvider)
throws IOException {
- try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
+ File indexDir = _segmentMetadata.getIndexDir();
+ try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(segmentWriter, columnMetadata);
ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
- Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter, columnMetadata);
- JsonIndexCreator jsonIndexCreator = _indexCreatorProvider.newJsonIndexCreator(IndexCreationContext.builder()
- .withIndexDir(_indexDir).withColumnMetadata(columnMetadata).build().forJsonIndex())) {
+ Dictionary dictionary = LoaderUtils.getDictionary(segmentWriter, columnMetadata);
+ JsonIndexCreator jsonIndexCreator = indexCreatorProvider.newJsonIndexCreator(IndexCreationContext.builder()
+ .withIndexDir(indexDir).withColumnMetadata(columnMetadata).build().forJsonIndex())) {
int numDocs = columnMetadata.getTotalDocs();
for (int i = 0; i < numDocs; i++) {
int dictId = forwardIndexReader.getDictId(i, readerContext);
@@ -140,12 +144,14 @@ public class JsonIndexHandler implements IndexHandler {
}
}
- private void handleNonDictionaryBasedColumn(ColumnMetadata columnMetadata)
+ private void handleNonDictionaryBasedColumn(SegmentDirectory.Writer segmentWriter, ColumnMetadata columnMetadata,
+ JsonIndexCreatorProvider indexCreatorProvider)
throws IOException {
- try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
+ File indexDir = _segmentMetadata.getIndexDir();
+ try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(segmentWriter, columnMetadata);
ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
- JsonIndexCreator jsonIndexCreator = _indexCreatorProvider.newJsonIndexCreator(IndexCreationContext.builder()
- .withIndexDir(_indexDir).withColumnMetadata(columnMetadata).build().forJsonIndex())) {
+ JsonIndexCreator jsonIndexCreator = indexCreatorProvider.newJsonIndexCreator(IndexCreationContext.builder()
+ .withIndexDir(indexDir).withColumnMetadata(columnMetadata).build().forJsonIndex())) {
int numDocs = columnMetadata.getTotalDocs();
for (int i = 0; i < numDocs; i++) {
jsonIndexCreator.add(forwardIndexReader.getString(i, readerContext));
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java
index 3d5551d..95745aa 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/RangeIndexHandler.java
@@ -30,6 +30,7 @@ import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
import org.apache.pinot.segment.spi.creator.RangeIndexCreatorProvider;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.creator.CombinedInvertedIndexCreator;
@@ -45,33 +46,32 @@ import org.slf4j.LoggerFactory;
public class RangeIndexHandler implements IndexHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(RangeIndexHandler.class);
- private final File _indexDir;
private final SegmentMetadata _segmentMetadata;
- private final SegmentDirectory.Writer _segmentWriter;
private final Set<String> _columnsToAddIdx;
private final int _rangeIndexVersion;
- private final RangeIndexCreatorProvider _indexCreatorProvider;
- public RangeIndexHandler(File indexDir, SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig,
- SegmentDirectory.Writer segmentWriter, RangeIndexCreatorProvider indexCreatorProvider) {
- _indexDir = indexDir;
+ public RangeIndexHandler(SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig) {
_segmentMetadata = segmentMetadata;
- _segmentWriter = segmentWriter;
_columnsToAddIdx = new HashSet<>(indexLoadingConfig.getRangeIndexColumns());
_rangeIndexVersion = indexLoadingConfig.getRangeIndexVersion();
- _indexCreatorProvider = indexCreatorProvider;
}
@Override
- public void updateIndices()
+ public boolean needUpdateIndices(SegmentDirectory.Reader segmentReader) {
+ Set<String> existingColumns = segmentReader.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.RANGE_INDEX);
+ return !existingColumns.equals(_columnsToAddIdx);
+ }
+
+ @Override
+ public void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider)
throws IOException {
// Remove indices not set in table config any more
String segmentName = _segmentMetadata.getName();
- Set<String> existingColumns = _segmentWriter.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.RANGE_INDEX);
+ Set<String> existingColumns = segmentWriter.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.RANGE_INDEX);
for (String column : existingColumns) {
if (!_columnsToAddIdx.remove(column)) {
LOGGER.info("Removing existing range index from segment: {}, column: {}", segmentName, column);
- _segmentWriter.removeIndex(column, ColumnIndexType.RANGE_INDEX);
+ segmentWriter.removeIndex(column, ColumnIndexType.RANGE_INDEX);
LOGGER.info("Removed existing range index from segment: {}, column: {}", segmentName, column);
}
}
@@ -79,17 +79,19 @@ public class RangeIndexHandler implements IndexHandler {
ColumnMetadata columnMetadata = _segmentMetadata.getColumnMetadataFor(column);
// Only create range index on dictionary-encoded unsorted columns
if (columnMetadata != null && !columnMetadata.isSorted()) {
- createRangeIndexForColumn(columnMetadata);
+ createRangeIndexForColumn(segmentWriter, columnMetadata, indexCreatorProvider);
}
}
}
- private void createRangeIndexForColumn(ColumnMetadata columnMetadata)
+ private void createRangeIndexForColumn(SegmentDirectory.Writer segmentWriter, ColumnMetadata columnMetadata,
+ RangeIndexCreatorProvider indexCreatorProvider)
throws IOException {
+ File indexDir = _segmentMetadata.getIndexDir();
String segmentName = _segmentMetadata.getName();
- String column = columnMetadata.getColumnName();
- File inProgress = new File(_indexDir, column + ".range.inprogress");
- File rangeIndexFile = new File(_indexDir, column + V1Constants.Indexes.BITMAP_RANGE_INDEX_FILE_EXTENSION);
+ String columnName = columnMetadata.getColumnName();
+ File inProgress = new File(indexDir, columnName + ".range.inprogress");
+ File rangeIndexFile = new File(indexDir, columnName + V1Constants.Indexes.BITMAP_RANGE_INDEX_FILE_EXTENSION);
if (!inProgress.exists()) {
// Marker file does not exist, which means last run ended normally.
@@ -103,30 +105,31 @@ public class RangeIndexHandler implements IndexHandler {
}
// Create new range index for the column.
- LOGGER.info("Creating new range index for segment: {}, column: {}", segmentName, column);
+ LOGGER.info("Creating new range index for segment: {}, column: {}", segmentName, columnName);
if (columnMetadata.hasDictionary()) {
- handleDictionaryBasedColumn(columnMetadata);
+ handleDictionaryBasedColumn(segmentWriter, columnMetadata, indexCreatorProvider);
} else {
- handleNonDictionaryBasedColumn(columnMetadata);
+ handleNonDictionaryBasedColumn(segmentWriter, columnMetadata, indexCreatorProvider);
}
// For v3, write the generated range index file into the single file and remove it.
if (_segmentMetadata.getVersion() == SegmentVersion.v3) {
- LoaderUtils.writeIndexToV3Format(_segmentWriter, column, rangeIndexFile, ColumnIndexType.RANGE_INDEX);
+ LoaderUtils.writeIndexToV3Format(segmentWriter, columnName, rangeIndexFile, ColumnIndexType.RANGE_INDEX);
}
// Delete the marker file.
FileUtils.deleteQuietly(inProgress);
- LOGGER.info("Created range index for segment: {}, column: {}", segmentName, column);
+ LOGGER.info("Created range index for segment: {}, column: {}", segmentName, columnName);
}
- private void handleDictionaryBasedColumn(ColumnMetadata columnMetadata)
+ private void handleDictionaryBasedColumn(SegmentDirectory.Writer segmentWriter, ColumnMetadata columnMetadata,
+ RangeIndexCreatorProvider indexCreatorProvider)
throws IOException {
int numDocs = columnMetadata.getTotalDocs();
- try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
+ try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(segmentWriter, columnMetadata);
ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
- CombinedInvertedIndexCreator rangeIndexCreator = newRangeIndexCreator(columnMetadata)) {
+ CombinedInvertedIndexCreator rangeIndexCreator = newRangeIndexCreator(columnMetadata, indexCreatorProvider)) {
if (columnMetadata.isSingleValue()) {
// Single-value column
for (int i = 0; i < numDocs; i++) {
@@ -144,12 +147,13 @@ public class RangeIndexHandler implements IndexHandler {
}
}
- private void handleNonDictionaryBasedColumn(ColumnMetadata columnMetadata)
+ private void handleNonDictionaryBasedColumn(SegmentDirectory.Writer segmentWriter, ColumnMetadata columnMetadata,
+ RangeIndexCreatorProvider indexCreatorProvider)
throws IOException {
int numDocs = columnMetadata.getTotalDocs();
- try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
+ try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(segmentWriter, columnMetadata);
ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
- CombinedInvertedIndexCreator rangeIndexCreator = newRangeIndexCreator(columnMetadata)) {
+ CombinedInvertedIndexCreator rangeIndexCreator = newRangeIndexCreator(columnMetadata, indexCreatorProvider)) {
if (columnMetadata.isSingleValue()) {
// Single-value column.
switch (columnMetadata.getDataType()) {
@@ -216,10 +220,12 @@ public class RangeIndexHandler implements IndexHandler {
}
}
- private CombinedInvertedIndexCreator newRangeIndexCreator(ColumnMetadata columnMetadata)
+ private CombinedInvertedIndexCreator newRangeIndexCreator(ColumnMetadata columnMetadata,
+ RangeIndexCreatorProvider indexCreatorProvider)
throws IOException {
- return _indexCreatorProvider.newRangeIndexCreator(
- IndexCreationContext.builder().withIndexDir(_indexDir).withColumnMetadata(columnMetadata).build()
+ File indexDir = _segmentMetadata.getIndexDir();
+ return indexCreatorProvider.newRangeIndexCreator(
+ IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata).build()
.forRangeIndex(_rangeIndexVersion, columnMetadata.getMinValue(), columnMetadata.getMaxValue()));
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java
index 6e7c64e..9f81933 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java
@@ -47,6 +47,7 @@ import org.apache.pinot.segment.local.segment.index.loader.SegmentPreProcessor;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
import org.apache.pinot.segment.spi.creator.TextIndexCreatorProvider;
import org.apache.pinot.segment.spi.index.creator.TextIndexCreator;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
@@ -82,31 +83,30 @@ import org.slf4j.LoggerFactory;
public class TextIndexHandler implements IndexHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(TextIndexHandler.class);
- private final File _indexDir;
private final SegmentMetadata _segmentMetadata;
- private final SegmentDirectory.Writer _segmentWriter;
private final Set<String> _columnsToAddIdx;
- private final TextIndexCreatorProvider _textIndexCreatorProvider;
- public TextIndexHandler(File indexDir, SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig,
- SegmentDirectory.Writer segmentWriter, TextIndexCreatorProvider textIndexCreatorProvider) {
- _indexDir = indexDir;
+ public TextIndexHandler(SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig) {
_segmentMetadata = segmentMetadata;
- _segmentWriter = segmentWriter;
_columnsToAddIdx = new HashSet<>(indexLoadingConfig.getTextIndexColumns());
- _textIndexCreatorProvider = textIndexCreatorProvider;
}
@Override
- public void updateIndices()
+ public boolean needUpdateIndices(SegmentDirectory.Reader segmentReader) {
+ Set<String> existingColumns = segmentReader.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.TEXT_INDEX);
+ return !existingColumns.equals(_columnsToAddIdx);
+ }
+
+ @Override
+ public void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider)
throws Exception {
// Remove indices not set in table config any more
String segmentName = _segmentMetadata.getName();
- Set<String> existingColumns = _segmentWriter.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.TEXT_INDEX);
+ Set<String> existingColumns = segmentWriter.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.TEXT_INDEX);
for (String column : existingColumns) {
if (!_columnsToAddIdx.remove(column)) {
LOGGER.info("Removing existing text index from segment: {}, column: {}", segmentName, column);
- _segmentWriter.removeIndex(column, ColumnIndexType.TEXT_INDEX);
+ segmentWriter.removeIndex(column, ColumnIndexType.TEXT_INDEX);
LOGGER.info("Removed existing text index from segment: {}, column: {}", segmentName, column);
}
}
@@ -114,7 +114,7 @@ public class TextIndexHandler implements IndexHandler {
ColumnMetadata columnMetadata = _segmentMetadata.getColumnMetadataFor(column);
if (columnMetadata != null) {
checkUnsupportedOperationsForTextIndex(columnMetadata);
- createTextIndexForColumn(columnMetadata);
+ createTextIndexForColumn(segmentWriter, columnMetadata, indexCreatorProvider);
}
}
}
@@ -131,38 +131,42 @@ public class TextIndexHandler implements IndexHandler {
}
}
- private void createTextIndexForColumn(ColumnMetadata columnMetadata)
+ private void createTextIndexForColumn(SegmentDirectory.Writer segmentWriter, ColumnMetadata columnMetadata,
+ TextIndexCreatorProvider indexCreatorProvider)
throws Exception {
+ File indexDir = _segmentMetadata.getIndexDir();
String segmentName = _segmentMetadata.getName();
- String column = columnMetadata.getColumnName();
+ String columnName = columnMetadata.getColumnName();
int numDocs = columnMetadata.getTotalDocs();
boolean hasDictionary = columnMetadata.hasDictionary();
- LOGGER.info("Creating new text index for column: {} in segment: {}, hasDictionary: {}", column, segmentName,
+ LOGGER.info("Creating new text index for column: {} in segment: {}, hasDictionary: {}", columnName, segmentName,
hasDictionary);
- File segmentDirectory = SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, _segmentMetadata.getVersion());
+ File segmentDirectory = SegmentDirectoryPaths.segmentDirectoryFor(indexDir, _segmentMetadata.getVersion());
// The handlers are always invoked by the preprocessor. Before this ImmutableSegmentLoader would have already
// up-converted the segment from v1/v2 -> v3 (if needed). So based on the segmentVersion, whatever segment
// segmentDirectory is indicated to us by SegmentDirectoryPaths, we create lucene index there. There is no
// further need to move around the lucene index directory since it is created with correct directory structure
// based on segmentVersion.
- try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(_segmentWriter, columnMetadata);
+ try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(segmentWriter, columnMetadata);
ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
- TextIndexCreator textIndexCreator = _textIndexCreatorProvider.newTextIndexCreator(IndexCreationContext.builder()
+ TextIndexCreator textIndexCreator = indexCreatorProvider.newTextIndexCreator(IndexCreationContext.builder()
.withColumnMetadata(columnMetadata).withIndexDir(segmentDirectory).build().forTextIndex(true))) {
if (columnMetadata.isSingleValue()) {
- processSVField(hasDictionary, forwardIndexReader, readerContext, textIndexCreator, numDocs, columnMetadata);
+ processSVField(segmentWriter, hasDictionary, forwardIndexReader, readerContext, textIndexCreator, numDocs,
+ columnMetadata);
} else {
- processMVField(hasDictionary, forwardIndexReader, readerContext, textIndexCreator, numDocs, columnMetadata);
+ processMVField(segmentWriter, hasDictionary, forwardIndexReader, readerContext, textIndexCreator, numDocs,
+ columnMetadata);
}
textIndexCreator.seal();
}
- LOGGER.info("Created text index for column: {} in segment: {}", column, segmentName);
+ LOGGER.info("Created text index for column: {} in segment: {}", columnName, segmentName);
}
- private void processSVField(boolean hasDictionary, ForwardIndexReader forwardIndexReader,
- ForwardIndexReaderContext readerContext, TextIndexCreator textIndexCreator, int numDocs,
- ColumnMetadata columnMetadata)
+ private void processSVField(SegmentDirectory.Writer segmentWriter, boolean hasDictionary,
+ ForwardIndexReader forwardIndexReader, ForwardIndexReaderContext readerContext, TextIndexCreator textIndexCreator,
+ int numDocs, ColumnMetadata columnMetadata)
throws IOException {
if (!hasDictionary) {
// text index on raw column, just read the raw forward index
@@ -173,7 +177,7 @@ public class TextIndexHandler implements IndexHandler {
// text index on dictionary encoded SV column
// read forward index to get dictId
// read the raw value from dictionary using dictId
- try (Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter, columnMetadata)) {
+ try (Dictionary dictionary = LoaderUtils.getDictionary(segmentWriter, columnMetadata)) {
for (int docId = 0; docId < numDocs; docId++) {
int dictId = forwardIndexReader.getDictId(docId, readerContext);
textIndexCreator.add(dictionary.getStringValue(dictId));
@@ -182,9 +186,9 @@ public class TextIndexHandler implements IndexHandler {
}
}
- private void processMVField(boolean hasDictionary, ForwardIndexReader forwardIndexReader,
- ForwardIndexReaderContext readerContext, TextIndexCreator textIndexCreator, int numDocs,
- ColumnMetadata columnMetadata)
+ private void processMVField(SegmentDirectory.Writer segmentWriter, boolean hasDictionary,
+ ForwardIndexReader forwardIndexReader, ForwardIndexReaderContext readerContext, TextIndexCreator textIndexCreator,
+ int numDocs, ColumnMetadata columnMetadata)
throws IOException {
if (!hasDictionary) {
// text index on raw column, just read the raw forward index
@@ -197,7 +201,7 @@ public class TextIndexHandler implements IndexHandler {
// text index on dictionary encoded MV column
// read forward index to get dictId
// read the raw value from dictionary using dictId
- try (Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter, columnMetadata)) {
+ try (Dictionary dictionary = LoaderUtils.getDictionary(segmentWriter, columnMetadata)) {
int maxNumEntries = columnMetadata.getMaxNumberOfMultiValues();
int[] dictIdBuffer = new int[maxNumEntries];
String[] valueBuffer = new String[maxNumEntries];
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java
index 820317d..8ec95e0 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java
@@ -62,6 +62,8 @@ import org.testng.annotations.Test;
import org.testng.collections.Lists;
import static org.apache.pinot.segment.spi.V1Constants.Indexes.FST_INDEX_FILE_EXTENSION;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
public class LoaderTest {
@@ -144,6 +146,31 @@ public class LoaderTest {
Assert.assertFalse(v3TempDir.exists());
}
+ @Test
+ public void testIfNeedConvertSegmentFormat()
+ throws Exception {
+ constructV1Segment();
+
+ // The newly generated segment is consistent with table config and schema, thus
+ // in follow checks, whether it needs reprocess or not depends on segment format.
+ SegmentDirectory segmentDir = _localSegmentDirectoryLoader.load(_indexDir.toURI(),
+ new SegmentDirectoryLoaderContext(null, null, null, _pinotConfiguration));
+
+ // The segmentVersionToLoad is null, not leading to reprocess.
+ assertFalse(ImmutableSegmentLoader.needPreprocess(segmentDir, new IndexLoadingConfig(), null));
+
+ // The segmentVersionToLoad is v1, not leading to reprocess.
+ assertFalse(ImmutableSegmentLoader.needPreprocess(segmentDir, _v1IndexLoadingConfig, null));
+
+ // The segmentVersionToLoad is v3, leading to reprocess.
+ assertTrue(ImmutableSegmentLoader.needPreprocess(segmentDir, _v3IndexLoadingConfig, null));
+
+ // The segment is in v3 format now, not leading to reprocess.
+ ImmutableSegmentLoader.load(_indexDir, _v3IndexLoadingConfig);
+ segmentDir.reloadMetadata();
+ assertFalse(ImmutableSegmentLoader.needPreprocess(segmentDir, _v3IndexLoadingConfig, null));
+ }
+
private void testConversion()
throws Exception {
// Do not set segment version, should not convert the segment
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
index f985ba9..df231b3 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
@@ -28,6 +28,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.configuration.PropertiesConfiguration;
@@ -49,6 +50,7 @@ import org.apache.pinot.segment.spi.store.ColumnIndexType;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
import org.apache.pinot.spi.config.table.BloomFilterConfig;
+import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
@@ -783,16 +785,7 @@ public class SegmentPreProcessorTest {
constructV1Segment();
// Remove min/max value from the metadata
- PropertiesConfiguration configuration = SegmentMetadataImpl.getPropertiesConfiguration(_indexDir);
- Iterator<String> keys = configuration.getKeys();
- while (keys.hasNext()) {
- String key = keys.next();
- if (key.endsWith(V1Constants.MetadataKeys.Column.MIN_VALUE) || key.endsWith(
- V1Constants.MetadataKeys.Column.MAX_VALUE)) {
- configuration.clearProperty(key);
- }
- }
- configuration.save();
+ removeMinMaxValuesFromMetadataFile(_indexDir);
IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
indexLoadingConfig.setColumnMinMaxValueGeneratorMode(ColumnMinMaxValueGeneratorMode.NONE);
@@ -1080,4 +1073,129 @@ public class SegmentPreProcessorTest {
}
assertEquals(singleFileIndex.length(), initFileSize);
}
+
+ @Test
+ public void testV1IfNeedProcess()
+ throws Exception {
+ constructV1Segment();
+ SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
+ assertEquals(segmentMetadata.getVersion(), SegmentVersion.v1);
+
+ testIfNeedProcess();
+ }
+
+ @Test
+ public void testV3IfNeedProcess()
+ throws Exception {
+ constructV3Segment();
+ SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
+ assertEquals(segmentMetadata.getVersion(), SegmentVersion.v3);
+
+ testIfNeedProcess();
+ }
+
+ private void testIfNeedProcess()
+ throws Exception {
+ // There are a few indices initially. Require to remove them with an empty IndexLoadingConfig.
+ try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
+ .load(_indexDir.toURI(), new SegmentDirectoryLoaderContext(null, null, null, _configuration));
+ SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, new IndexLoadingConfig(), null)) {
+ assertTrue(processor.needProcess());
+ processor.process();
+ assertFalse(processor.needProcess());
+ }
+
+ // Require to add some default columns with new schema.
+ try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
+ .load(_indexDir.toURI(), new SegmentDirectoryLoaderContext(null, null, null, _configuration));
+ SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, new IndexLoadingConfig(),
+ _newColumnsSchemaWithH3Json)) {
+ assertTrue(processor.needProcess());
+ processor.process();
+ assertFalse(processor.needProcess());
+ }
+
+ // Require to add different types of indices. Add one new index a time
+ // to test the index handlers separately.
+ IndexLoadingConfig config = new IndexLoadingConfig();
+ for (Runnable prepFunc : createConfigPrepFunctions(config)) {
+ prepFunc.run();
+ try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
+ .load(_indexDir.toURI(), new SegmentDirectoryLoaderContext(null, null, null, _configuration));
+ SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, config,
+ _newColumnsSchemaWithH3Json)) {
+ assertTrue(processor.needProcess());
+ processor.process();
+ assertFalse(processor.needProcess());
+ }
+ }
+
+ // Require to add startree index.
+ IndexingConfig indexingConfig = new IndexingConfig();
+ indexingConfig.setEnableDynamicStarTreeCreation(true);
+ indexingConfig.setEnableDefaultStarTree(true);
+ _tableConfig.setIndexingConfig(indexingConfig);
+ IndexLoadingConfig configWithStarTreeIndex = new IndexLoadingConfig(null, _tableConfig);
+ createConfigPrepFunctions(configWithStarTreeIndex).forEach(Runnable::run);
+ try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
+ .load(_indexDir.toURI(), new SegmentDirectoryLoaderContext(null, null, null, _configuration));
+ SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, configWithStarTreeIndex,
+ _newColumnsSchemaWithH3Json)) {
+ assertTrue(processor.needProcess());
+ processor.process();
+ assertFalse(processor.needProcess());
+ }
+
+ // Require to update min and max values.
+ removeMinMaxValuesFromMetadataFile(_indexDir);
+ SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
+ segmentMetadata.getColumnMetadataMap().forEach((k, v) -> {
+ assertNull(v.getMinValue(), "checking column: " + k);
+ assertNull(v.getMaxValue(), "checking column: " + k);
+ });
+ try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
+ .load(_indexDir.toURI(), new SegmentDirectoryLoaderContext(null, null, null, _configuration));
+ SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, configWithStarTreeIndex,
+ _newColumnsSchemaWithH3Json)) {
+ assertTrue(processor.needProcess());
+ processor.process();
+ }
+ segmentMetadata = new SegmentMetadataImpl(_indexDir);
+ segmentMetadata.getColumnMetadataMap().forEach((k, v) -> {
+ if (v.hasDictionary()) {
+ assertNotNull(v.getMinValue(), "checking column: " + k);
+ assertNotNull(v.getMaxValue(), "checking column: " + k);
+ } else {
+ assertNull(v.getMinValue(), "checking column: " + k);
+ assertNull(v.getMaxValue(), "checking column: " + k);
+ }
+ });
+ }
+
+ private static void removeMinMaxValuesFromMetadataFile(File indexDir)
+ throws Exception {
+ PropertiesConfiguration configuration = SegmentMetadataImpl.getPropertiesConfiguration(indexDir);
+ Iterator<String> keys = configuration.getKeys();
+ while (keys.hasNext()) {
+ String key = keys.next();
+ if (key.endsWith(V1Constants.MetadataKeys.Column.MIN_VALUE) || key.endsWith(
+ V1Constants.MetadataKeys.Column.MAX_VALUE)) {
+ configuration.clearProperty(key);
+ }
+ }
+ configuration.save();
+ }
+
+ private static List<Runnable> createConfigPrepFunctions(IndexLoadingConfig config) {
+ return Arrays.asList(
+ () -> config.setInvertedIndexColumns(new HashSet<>(Collections.singletonList("column3"))),
+ () -> config.setRangeIndexColumns(new HashSet<>(Collections.singletonList("column3"))),
+ () -> config.setTextIndexColumns(new HashSet<>(Collections.singletonList("column3"))),
+ () -> config.setFSTIndexColumns(new HashSet<>(Collections.singletonList("column3"))),
+ () -> config.setBloomFilterConfigs(ImmutableMap.of("column3", new BloomFilterConfig(0.1, 1024, true))),
+ () -> config
+ .setH3IndexConfigs(ImmutableMap.of("newH3Col", new H3IndexConfig(ImmutableMap.of("resolutions", "5")))),
+ () -> config.setJsonIndexColumns(new HashSet<>(Collections.singletonList("newJsonCol")))
+ );
+ }
}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java
index 60de836..7f0dc76 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java
@@ -157,6 +157,10 @@ public abstract class SegmentDirectory implements Closeable {
public abstract boolean hasIndexFor(String column, ColumnIndexType type);
+ public SegmentDirectory toSegmentDirectory() {
+ return SegmentDirectory.this;
+ }
+
public abstract String toString();
}
@@ -192,10 +196,6 @@ public abstract class SegmentDirectory implements Closeable {
public abstract void save()
throws IOException;
- public SegmentDirectory toSegmentDirectory() {
- return SegmentDirectory.this;
- }
-
public abstract String toString();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org