You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2022/09/29 21:06:49 UTC
[pinot] branch master updated: ForwardIndexHandler: Change compressionType during segmentReload (#9454)
This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e917bdc5b7 ForwardIndexHandler: Change compressionType during segmentReload (#9454)
e917bdc5b7 is described below
commit e917bdc5b71df66f46cb805d3a409b7850f2b7a0
Author: Vivek Iyer Vaidyanathan <vv...@gmail.com>
AuthorDate: Thu Sep 29 14:06:41 2022 -0700
ForwardIndexHandler: Change compressionType during segmentReload (#9454)
* FrwardIndexHandler: Allow changing compressionType for SV columns on reload
* Address review comments.
* Add more tests and address review comments
---
.../segment/index/loader/ForwardIndexHandler.java | 273 ++++++++++++
.../local/segment/index/loader/IndexHandler.java | 3 +-
.../segment/index/loader/IndexHandlerFactory.java | 2 +
.../segment/index/loader/IndexLoadingConfig.java | 55 +++
.../segment/index/loader/SegmentPreProcessor.java | 4 +-
.../forward/BaseChunkForwardIndexReader.java | 20 +-
.../index/loader/ForwardIndexHandlerTest.java | 475 +++++++++++++++++++++
.../index/loader/SegmentPreProcessorTest.java | 154 +++++--
.../spi/index/reader/ForwardIndexReader.java | 19 +
.../pinot/segment/spi/store/ColumnIndexType.java | 3 +
10 files changed, 969 insertions(+), 39 deletions(-)
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
new file mode 100644
index 0000000000..d9c317e2e6
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.loader;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.creator.IndexCreationContext;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
+import org.apache.pinot.segment.spi.creator.SegmentVersion;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.store.ColumnIndexType;
+import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Helper class used by {@link SegmentPreProcessor} to make changes to forward index and dictionary configs. Note
+ * that this handler only works for segment versions >= 3.0. Support for segment version < 3.0 is not added because
+ * majority of the usecases are in versions >= 3.0 and this avoids adding tech debt. The currently supported
+ * operations are:
+ * 1. Change compression on raw SV columns.
+ *
+ * TODO: Add support for the following:
+ * 1. Change compression for raw MV columns
+ * 2. Enable dictionary
+ * 3. Disable dictionary
+ */
+public class ForwardIndexHandler implements IndexHandler {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ForwardIndexHandler.class);
+
+ private final SegmentMetadata _segmentMetadata;
+ IndexLoadingConfig _indexLoadingConfig;
+
+ protected enum Operation {
+ // TODO: Add other operations like ENABLE_DICTIONARY, DISABLE_DICTIONARY.
+ CHANGE_RAW_INDEX_COMPRESSION_TYPE,
+ }
+
+ public ForwardIndexHandler(SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig) {
+ _segmentMetadata = segmentMetadata;
+ _indexLoadingConfig = indexLoadingConfig;
+ }
+
+ @Override
+ public boolean needUpdateIndices(SegmentDirectory.Reader segmentReader)
+ throws Exception {
+ Map<String, Operation> columnOperationMap = computeOperation(segmentReader);
+ return !columnOperationMap.isEmpty();
+ }
+
+ @Override
+ public void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider)
+ throws Exception {
+ Map<String, Operation> columnOperationMap = computeOperation(segmentWriter);
+ if (columnOperationMap.isEmpty()) {
+ return;
+ }
+
+ for (Map.Entry<String, Operation> entry : columnOperationMap.entrySet()) {
+ String column = entry.getKey();
+ Operation operation = entry.getValue();
+
+ switch (operation) {
+ case CHANGE_RAW_INDEX_COMPRESSION_TYPE:
+ rewriteRawForwardIndex(column, segmentWriter, indexCreatorProvider);
+ break;
+ // TODO: Add other operations here.
+ default:
+ throw new IllegalStateException("Unsupported operation for column " + column);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ Map<String, Operation> computeOperation(SegmentDirectory.Reader segmentReader)
+ throws Exception {
+ Map<String, Operation> columnOperationMap = new HashMap<>();
+
+ // Does not work for segment versions < V3
+ if (_segmentMetadata.getVersion().compareTo(SegmentVersion.v3) < 0) {
+ return columnOperationMap;
+ }
+
+ // From existing column config.
+ Set<String> existingAllColumns = _segmentMetadata.getAllColumns();
+ Set<String> existingDictColumns =
+ segmentReader.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.DICTIONARY);
+ Set<String> existingNoDictColumns = new HashSet<>();
+ for (String column : existingAllColumns) {
+ if (!existingDictColumns.contains(column)) {
+ existingNoDictColumns.add(column);
+ }
+ }
+
+ // From new column config.
+ Set<String> newNoDictColumns = _indexLoadingConfig.getNoDictionaryColumns();
+
+ for (String column : existingAllColumns) {
+ if (existingNoDictColumns.contains(column) && newNoDictColumns.contains(column)) {
+ // Both existing and new column is RAW forward index encoded. Check if compression needs to be changed.
+ if (shouldChangeCompressionType(column, segmentReader)) {
+ columnOperationMap.put(column, Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE);
+ }
+ }
+ }
+
+ return columnOperationMap;
+ }
+
+ private boolean shouldChangeCompressionType(String column, SegmentDirectory.Reader segmentReader) throws Exception {
+ ColumnMetadata existingColMetadata = _segmentMetadata.getColumnMetadataFor(column);
+
+ // TODO: Remove this MV column limitation.
+ if (!existingColMetadata.isSingleValue()) {
+ return false;
+ }
+
+ // The compression type for an existing segment can only be determined by reading the forward index header.
+ try (ForwardIndexReader fwdIndexReader = LoaderUtils.getForwardIndexReader(segmentReader, existingColMetadata)) {
+ ChunkCompressionType existingCompressionType = fwdIndexReader.getCompressionType();
+ Preconditions.checkState(existingCompressionType != null,
+ "Existing compressionType cannot be null for raw forward index column=" + column);
+
+ // Get the new compression type.
+ ChunkCompressionType newCompressionType = null;
+ Map<String, ChunkCompressionType> newCompressionConfigs = _indexLoadingConfig.getCompressionConfigs();
+ if (newCompressionConfigs.containsKey(column)) {
+ newCompressionType = newCompressionConfigs.get(column);
+ }
+
+ // Note that default compression type (PASS_THROUGH for metric and LZ4 for dimension) is not considered if the
+ // compressionType is not explicitly provided in tableConfig. This is to avoid incorrectly rewriting the all
+ // forward indexes during segmentReload when the default compressionType changes.
+ if (newCompressionType == null || existingCompressionType == newCompressionType) {
+ return false;
+ }
+
+ return true;
+ }
+ }
+
+ private void rewriteRawForwardIndex(String column, SegmentDirectory.Writer segmentWriter,
+ IndexCreatorProvider indexCreatorProvider)
+ throws Exception {
+ Preconditions.checkState(_segmentMetadata.getVersion() == SegmentVersion.v3);
+
+ ColumnMetadata existingColMetadata = _segmentMetadata.getColumnMetadataFor(column);
+ File indexDir = _segmentMetadata.getIndexDir();
+ String segmentName = _segmentMetadata.getName();
+ File inProgress = new File(indexDir, column + ".fwd.inprogress");
+ File fwdIndexFile = new File(indexDir, column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
+
+ if (!inProgress.exists()) {
+ // Marker file does not exist, which means last run ended normally.
+ // Create a marker file.
+ FileUtils.touch(inProgress);
+ } else {
+ // Marker file exists, which means last run was interrupted.
+ // Remove forward index if exists.
+ FileUtils.deleteQuietly(fwdIndexFile);
+ }
+
+ LOGGER.info("Creating new forward index for segment={} and column={}", segmentName, column);
+
+ Map<String, ChunkCompressionType> compressionConfigs = _indexLoadingConfig.getCompressionConfigs();
+ Preconditions.checkState(compressionConfigs.containsKey(column));
+ // At this point, compressionConfigs is guaranteed to contain the column. If there's no entry in the map, we
+ // wouldn't have computed the CHANGE_RAW_COMPRESSION_TYPE operation for this column as compressionType changes
+ // are processed only if a valid compressionType is specified in fieldConfig.
+ ChunkCompressionType newCompressionType = compressionConfigs.get(column);
+
+ int numDocs = existingColMetadata.getTotalDocs();
+
+ try (ForwardIndexReader reader = LoaderUtils.getForwardIndexReader(segmentWriter, existingColMetadata)) {
+ int lengthOfLongestEntry = reader.getLengthOfLongestEntry();
+ Preconditions.checkState(lengthOfLongestEntry >= 0,
+ "lengthOfLongestEntry cannot be negative. segment=" + segmentName + " column={}" + column);
+
+ IndexCreationContext.Forward context =
+ IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata)
+ .withLengthOfLongestEntry(lengthOfLongestEntry).build()
+ .forForwardIndex(newCompressionType, _indexLoadingConfig.getColumnProperties());
+
+ try (ForwardIndexCreator creator = indexCreatorProvider.newForwardIndexCreator(context)) {
+ // If creator stored type and the reader stored type do not match, throw an exception.
+ if (!reader.getStoredType().equals(creator.getValueType())) {
+ String failureMsg =
+ "Unsupported operation to change datatype for column=" + column + " from " + reader.getStoredType()
+ .toString() + " to " + creator.getValueType().toString();
+ throw new UnsupportedOperationException(failureMsg);
+ }
+
+ PinotSegmentColumnReader columnReader =
+ new PinotSegmentColumnReader(reader, null, null, existingColMetadata.getMaxNumberOfMultiValues());
+
+ for (int i = 0; i < numDocs; i++) {
+ Object val = columnReader.getValue(i);
+
+ // JSON fields are either stored as string or bytes. No special handling is needed because we make this
+ // decision based on the storedType of the reader.
+ switch (reader.getStoredType()) {
+ case INT:
+ creator.putInt((int) val);
+ break;
+ case LONG:
+ creator.putLong((long) val);
+ break;
+ case FLOAT:
+ creator.putFloat((float) val);
+ break;
+ case DOUBLE:
+ creator.putDouble((double) val);
+ break;
+ case STRING:
+ creator.putString((String) val);
+ break;
+ case BYTES:
+ creator.putBytes((byte[]) val);
+ break;
+ case BIG_DECIMAL:
+ creator.putBigDecimal((BigDecimal) val);
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+ }
+ }
+
+ // We used the existing forward index to generate a new forward index. The existing forward index will be in V3
+ // format and the new forward index will be in V1 format. Remove the existing forward index as it is not needed
+ // anymore. Note that removeIndex() will only mark an index for removal and remove the in-memory state. The
+ // actual cleanup from columns.psf file will happen when singleFileIndexDirectory.cleanupRemovedIndices() is
+ // called during segmentWriter.close().
+ segmentWriter.removeIndex(column, ColumnIndexType.FORWARD_INDEX);
+ LoaderUtils.writeIndexToV3Format(segmentWriter, column, fwdIndexFile, ColumnIndexType.FORWARD_INDEX);
+
+ // Delete the marker file.
+ FileUtils.deleteQuietly(inProgress);
+
+ LOGGER.info("Created forward index for segment: {}, column: {}", segmentName, column);
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandler.java
index 5b2ba4706f..76132a7767 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandler.java
@@ -37,5 +37,6 @@ public interface IndexHandler {
* Check if there is a need to add new indices or removes obsolete indices.
* @return true if there is a need to update.
*/
- boolean needUpdateIndices(SegmentDirectory.Reader segmentReader);
+ boolean needUpdateIndices(SegmentDirectory.Reader segmentReader)
+ throws Exception;
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java
index bc48ff9771..6be28d8bb9 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java
@@ -63,6 +63,8 @@ public class IndexHandlerFactory {
return new H3IndexHandler(segmentMetadata, indexLoadingConfig);
case BLOOM_FILTER:
return new BloomFilterHandler(segmentMetadata, indexLoadingConfig);
+ case FORWARD_INDEX:
+ return new ForwardIndexHandler(segmentMetadata, indexLoadingConfig);
default:
return NO_OP_HANDLER;
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
index d5c4b0d7bc..78dc118a78 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
@@ -31,6 +31,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.segment.local.segment.index.column.PhysicalColumnIndexContainer;
import org.apache.pinot.segment.local.segment.index.loader.columnminmaxvalue.ColumnMinMaxValueGeneratorMode;
import org.apache.pinot.segment.local.segment.store.TextIndexUtils;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.creator.H3IndexConfig;
@@ -75,6 +76,7 @@ public class IndexLoadingConfig {
private boolean _enableDynamicStarTreeCreation;
private List<StarTreeIndexConfig> _starTreeIndexConfigs;
private boolean _enableDefaultStarTree;
+ private Map<String, ChunkCompressionType> _compressionConfigs = new HashMap<>();
private SegmentVersion _segmentVersion;
private ColumnMinMaxValueGeneratorMode _columnMinMaxValueGeneratorMode = ColumnMinMaxValueGeneratorMode.DEFAULT_MODE;
@@ -153,6 +155,7 @@ public class IndexLoadingConfig {
}
}
+ extractCompressionConfigs(tableConfig);
extractTextIndexColumnsFromTableConfig(tableConfig);
extractFSTIndexColumnsFromTableConfig(tableConfig);
extractH3IndexConfigsFromTableConfig(tableConfig);
@@ -215,6 +218,28 @@ public class IndexLoadingConfig {
}
}
+ /**
+ * Extracts compressionType for each column. Populates a map containing column name as key and compression type as
+ * value. This map will only contain the compressionType overrides, and it does not correspond to the default value
+ * of compressionType (derived using SegmentColumnarIndexCreator.getColumnCompressionType()) used for a column.
+ * Note that only RAW forward index columns will be populated in this map.
+ * @param tableConfig table config
+ */
+ private void extractCompressionConfigs(TableConfig tableConfig) {
+ List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
+ if (fieldConfigList == null) {
+ return;
+ }
+
+ for (FieldConfig fieldConfig : fieldConfigList) {
+ String column = fieldConfig.getName();
+ if (fieldConfig.getCompressionCodec() != null) {
+ ChunkCompressionType compressionType = ChunkCompressionType.valueOf(fieldConfig.getCompressionCodec().name());
+ _compressionConfigs.put(column, compressionType);
+ }
+ }
+ }
+
/**
* Text index creation info for each column is specified
* using {@link FieldConfig} model of indicating per column
@@ -371,6 +396,24 @@ public class IndexLoadingConfig {
_invertedIndexColumns = invertedIndexColumns;
}
+ /**
+ * For tests only.
+ * Used by segmentPreProcessorTest to set raw columns.
+ */
+ @VisibleForTesting
+ public void setNoDictionaryColumns(Set<String> noDictionaryColumns) {
+ _noDictionaryColumns = noDictionaryColumns;
+ }
+
+ /**
+ * For tests only.
+ * Used by segmentPreProcessorTest to set compression configs.
+ */
+ @VisibleForTesting
+ public void setCompressionConfigs(Map<String, ChunkCompressionType> compressionConfigs) {
+ _compressionConfigs = compressionConfigs;
+ }
+
/**
* For tests only.
*/
@@ -424,6 +467,18 @@ public class IndexLoadingConfig {
return _noDictionaryColumns;
}
+ /**
+ * Populates a map containing column name as key and compression type as value. This map will only contain the
+ * compressionType overrides, and it does not correspond to the default value of compressionType (derived using
+ * SegmentColumnarIndexCreator.getColumnCompressionType()) used for a column. Note that only RAW forward index
+ * columns will be populated in this map.
+ *
+ * @return a map containing column name as key and compressionType as value.
+ */
+ public Map<String, ChunkCompressionType> getCompressionConfigs() {
+ return _compressionConfigs;
+ }
+
public Map<String, String> getnoDictionaryConfig() {
return _noDictionaryConfig;
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java
index 202f41c2b2..3da4c2b524 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java
@@ -104,8 +104,8 @@ public class SegmentPreProcessor implements AutoCloseable {
// Update single-column indices, like inverted index, json index etc.
IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider();
for (ColumnIndexType type : ColumnIndexType.values()) {
- IndexHandlerFactory.getIndexHandler(type, _segmentMetadata, _indexLoadingConfig)
- .updateIndices(segmentWriter, indexCreatorProvider);
+ IndexHandler handler = IndexHandlerFactory.getIndexHandler(type, _segmentMetadata, _indexLoadingConfig);
+ handler.updateIndices(segmentWriter, indexCreatorProvider);
}
// Create/modify/remove star-trees if required.
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
index beba74f9a2..e37fb4a987 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
@@ -48,6 +48,7 @@ public abstract class BaseChunkForwardIndexReader implements ForwardIndexReader<
protected final int _numDocsPerChunk;
protected final int _lengthOfLongestEntry;
protected final boolean _isCompressed;
+ protected final ChunkCompressionType _compressionType;
protected final ChunkDecompressor _chunkDecompressor;
protected final PinotDataBuffer _dataHeader;
protected final int _headerEntryChunkOffsetSize;
@@ -79,15 +80,16 @@ public abstract class BaseChunkForwardIndexReader implements ForwardIndexReader<
_dataBuffer.getInt(headerOffset); // Total docs
headerOffset += Integer.BYTES;
- ChunkCompressionType compressionType = ChunkCompressionType.valueOf(_dataBuffer.getInt(headerOffset));
- _chunkDecompressor = ChunkCompressorFactory.getDecompressor(compressionType);
- _isCompressed = !compressionType.equals(ChunkCompressionType.PASS_THROUGH);
+ _compressionType = ChunkCompressionType.valueOf(_dataBuffer.getInt(headerOffset));
+ _chunkDecompressor = ChunkCompressorFactory.getDecompressor(_compressionType);
+ _isCompressed = !_compressionType.equals(ChunkCompressionType.PASS_THROUGH);
headerOffset += Integer.BYTES;
dataHeaderStart = _dataBuffer.getInt(headerOffset);
} else {
_isCompressed = true;
- _chunkDecompressor = ChunkCompressorFactory.getDecompressor(ChunkCompressionType.SNAPPY);
+ _compressionType = ChunkCompressionType.SNAPPY;
+ _chunkDecompressor = ChunkCompressorFactory.getDecompressor(_compressionType);
}
_headerEntryChunkOffsetSize = BaseChunkSVForwardIndexWriter.getHeaderEntryChunkOffsetSize(version);
@@ -174,6 +176,16 @@ public abstract class BaseChunkForwardIndexReader implements ForwardIndexReader<
return _storedType;
}
+ @Override
+ public ChunkCompressionType getCompressionType() {
+ return _compressionType;
+ }
+
+ @Override
+ public int getLengthOfLongestEntry() {
+ return _lengthOfLongestEntry;
+ }
+
@Override
public void readValuesSV(int[] docIds, int length, int[] values, ChunkReaderContext context) {
if (_storedType.isFixedWidth() && !_isCompressed && isContiguousRange(docIds, length)) {
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
new file mode 100644
index 0000000000..12bb8b61a2
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.loader;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
+import org.apache.pinot.segment.local.segment.store.SegmentLocalFSDirectory;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.index.IndexingOverrides;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class ForwardIndexHandlerTest {
+ private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "ForwardIndexHandlerTest");
+ private static final String TABLE_NAME = "myTable";
+ private static final String SEGMENT_NAME = "testSegment";
+
+ // TODO:
+ // 1. Add other datatypes (double, float, bigdecimal, bytes). Also add MV columns.
+ // 2. Add text index and other index types for raw columns.
+ private static final String DIM_SNAPPY_STRING = "DIM_SNAPPY_STRING";
+ private static final String DIM_PASS_THROUGH_STRING = "DIM_PASS_THROUGH_STRING";
+ private static final String DIM_ZSTANDARD_STRING = "DIM_ZSTANDARD_STRING";
+ private static final String DIM_LZ4_STRING = "DIM_LZ4_STRING";
+
+ private static final String DIM_SNAPPY_LONG = "DIM_SNAPPY_LONG";
+ private static final String DIM_PASS_THROUGH_LONG = "DIM_PASS_THROUGH_LONG";
+ private static final String DIM_ZSTANDARD_LONG = "DIM_ZSTANDARD_LONG";
+ private static final String DIM_LZ4_LONG = "DIM_LZ4_LONG";
+
+ private static final String DIM_SNAPPY_INTEGER = "DIM_SNAPPY_INTEGER";
+ private static final String DIM_PASS_THROUGH_INTEGER = "DIM_PASS_THROUGH_INTEGER";
+ private static final String DIM_ZSTANDARD_INTEGER = "DIM_ZSTANDARD_INTEGER";
+ private static final String DIM_LZ4_INTEGER = "DIM_LZ4_INTEGER";
+
+ private static final String DIM_DICT_INTEGER = "DIM_DICT_INTEGER";
+ private static final String DIM_DICT_STRING = "DIM_DICT_STRING";
+ private static final String DIM_DICT_LONG = "DIM_DICT_LONG";
+
+ private static final String METRIC_PASSTHROUGH_INTEGER = "METRIC_PASSTHROUGH_INTEGER";
+ private static final String METRIC_SNAPPY_INTEGER = "METRIC_SNAPPY_INTEGER";
+ private static final String METRIC_ZSTANDARD_INTEGER = "METRIC_ZSTANDARD_INTEGER";
+ private static final String METRIC_LZ4_INTEGER = "METRIC_LZ4_INTEGER";
+
+ private static final List<String> RAW_SNAPPY_INDEX_COLUMNS =
+ Arrays.asList(DIM_SNAPPY_STRING, DIM_SNAPPY_LONG, DIM_SNAPPY_INTEGER, METRIC_SNAPPY_INTEGER);
+
+ private static final List<String> RAW_ZSTANDARD_INDEX_COLUMNS =
+ Arrays.asList(DIM_ZSTANDARD_STRING, DIM_ZSTANDARD_LONG, DIM_ZSTANDARD_INTEGER, METRIC_ZSTANDARD_INTEGER);
+
+ private static final List<String> RAW_PASS_THROUGH_INDEX_COLUMNS =
+ Arrays.asList(DIM_PASS_THROUGH_STRING, DIM_PASS_THROUGH_LONG, DIM_PASS_THROUGH_INTEGER,
+ METRIC_PASSTHROUGH_INTEGER);
+
+ private static final List<String> RAW_LZ4_INDEX_COLUMNS =
+ Arrays.asList(DIM_LZ4_STRING, DIM_LZ4_LONG, DIM_LZ4_INTEGER, METRIC_LZ4_INTEGER);
+
+ private final List<String> _noDictionaryColumns = new ArrayList<>();
+ TableConfig _tableConfig;
+ Schema _schema;
+ File _segmentDirectory;
+ private List<FieldConfig.CompressionCodec> _allCompressionTypes =
+ Arrays.asList(FieldConfig.CompressionCodec.values());
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ // Delete index directly if it already exists.
+ FileUtils.deleteQuietly(INDEX_DIR);
+
+ buildSegment();
+ }
+
+ private void buildSegment()
+ throws Exception {
+ List<GenericRow> rows = createTestData();
+
+ List<FieldConfig> fieldConfigs = new ArrayList<>(
+ RAW_SNAPPY_INDEX_COLUMNS.size() + RAW_ZSTANDARD_INDEX_COLUMNS.size() + RAW_PASS_THROUGH_INDEX_COLUMNS.size()
+ + RAW_LZ4_INDEX_COLUMNS.size());
+
+ for (String indexColumn : RAW_SNAPPY_INDEX_COLUMNS) {
+ fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(),
+ FieldConfig.CompressionCodec.SNAPPY, null));
+ }
+
+ for (String indexColumn : RAW_ZSTANDARD_INDEX_COLUMNS) {
+ fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(),
+ FieldConfig.CompressionCodec.ZSTANDARD, null));
+ }
+
+ for (String indexColumn : RAW_PASS_THROUGH_INDEX_COLUMNS) {
+ fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(),
+ FieldConfig.CompressionCodec.PASS_THROUGH, null));
+ }
+
+ for (String indexColumn : RAW_LZ4_INDEX_COLUMNS) {
+ fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(),
+ FieldConfig.CompressionCodec.LZ4, null));
+ }
+
+ _noDictionaryColumns.addAll(RAW_SNAPPY_INDEX_COLUMNS);
+ _noDictionaryColumns.addAll(RAW_ZSTANDARD_INDEX_COLUMNS);
+ _noDictionaryColumns.addAll(RAW_PASS_THROUGH_INDEX_COLUMNS);
+ _noDictionaryColumns.addAll(RAW_LZ4_INDEX_COLUMNS);
+
+ _tableConfig =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setNoDictionaryColumns(_noDictionaryColumns)
+ .setFieldConfigList(fieldConfigs).build();
+ _schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+ .addSingleValueDimension(DIM_SNAPPY_STRING, FieldSpec.DataType.STRING)
+ .addSingleValueDimension(DIM_PASS_THROUGH_STRING, FieldSpec.DataType.STRING)
+ .addSingleValueDimension(DIM_ZSTANDARD_STRING, FieldSpec.DataType.STRING)
+ .addSingleValueDimension(DIM_LZ4_STRING, FieldSpec.DataType.STRING)
+ .addSingleValueDimension(DIM_SNAPPY_INTEGER, FieldSpec.DataType.INT)
+ .addSingleValueDimension(DIM_ZSTANDARD_INTEGER, FieldSpec.DataType.INT)
+ .addSingleValueDimension(DIM_PASS_THROUGH_INTEGER, FieldSpec.DataType.INT)
+ .addSingleValueDimension(DIM_LZ4_INTEGER, FieldSpec.DataType.INT)
+ .addSingleValueDimension(DIM_SNAPPY_LONG, FieldSpec.DataType.LONG)
+ .addSingleValueDimension(DIM_ZSTANDARD_LONG, FieldSpec.DataType.LONG)
+ .addSingleValueDimension(DIM_PASS_THROUGH_LONG, FieldSpec.DataType.LONG)
+ .addSingleValueDimension(DIM_LZ4_LONG, FieldSpec.DataType.LONG)
+ .addSingleValueDimension(DIM_DICT_INTEGER, FieldSpec.DataType.INT)
+ .addSingleValueDimension(DIM_DICT_LONG, FieldSpec.DataType.LONG)
+ .addSingleValueDimension(DIM_DICT_STRING, FieldSpec.DataType.STRING)
+ .addMetric(METRIC_PASSTHROUGH_INTEGER, FieldSpec.DataType.INT)
+ .addMetric(METRIC_SNAPPY_INTEGER, FieldSpec.DataType.INT).addMetric(METRIC_LZ4_INTEGER, FieldSpec.DataType.INT)
+ .addMetric(METRIC_ZSTANDARD_INTEGER, FieldSpec.DataType.INT)
+
+ .build();
+
+ SegmentGeneratorConfig config = new SegmentGeneratorConfig(_tableConfig, _schema);
+ config.setOutDir(INDEX_DIR.getPath());
+ config.setTableName(TABLE_NAME);
+ config.setSegmentName(SEGMENT_NAME);
+ SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+ try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
+ driver.init(config, recordReader);
+ driver.build();
+ }
+
+ _segmentDirectory = new File(INDEX_DIR, driver.getSegmentName());
+ }
+
+ private List<GenericRow> createTestData() {
+ List<GenericRow> rows = new ArrayList<>();
+
+ //Generate random data
+ int rowLength = 1000;
+ Random random = new Random();
+ String[] tempStringRows = new String[rowLength];
+ Integer[] tempIntRows = new Integer[rowLength];
+ Long[] tempLongRows = new Long[rowLength];
+
+ for (int i = 0; i < rowLength; i++) {
+ //Adding a fixed value to check for filter queries
+ if (i % 10 == 0) {
+ tempStringRows[i] = "testRow";
+ tempIntRows[i] = 1001;
+ tempLongRows[i] = 1001L;
+ } else {
+ tempStringRows[i] = "n" + i;
+ tempIntRows[i] = i;
+ tempLongRows[i] = (long) i;
+ }
+ }
+
+ for (int i = 0; i < rowLength; i++) {
+ GenericRow row = new GenericRow();
+
+ // Raw String columns
+ row.putValue(DIM_SNAPPY_STRING, tempStringRows[i]);
+ row.putValue(DIM_ZSTANDARD_STRING, tempStringRows[i]);
+ row.putValue(DIM_PASS_THROUGH_STRING, tempStringRows[i]);
+ row.putValue(DIM_LZ4_STRING, tempStringRows[i]);
+
+ // Raw integer columns
+ row.putValue(DIM_SNAPPY_INTEGER, tempIntRows[i]);
+ row.putValue(DIM_ZSTANDARD_INTEGER, tempIntRows[i]);
+ row.putValue(DIM_PASS_THROUGH_INTEGER, tempIntRows[i]);
+ row.putValue(DIM_LZ4_INTEGER, tempIntRows[i]);
+ row.putValue(METRIC_LZ4_INTEGER, tempIntRows[i]);
+ row.putValue(METRIC_PASSTHROUGH_INTEGER, tempIntRows[i]);
+ row.putValue(METRIC_ZSTANDARD_INTEGER, tempIntRows[i]);
+ row.putValue(METRIC_SNAPPY_INTEGER, tempIntRows[i]);
+
+ // Raw long columns
+ row.putValue(DIM_SNAPPY_LONG, tempLongRows[i]);
+ row.putValue(DIM_ZSTANDARD_LONG, tempLongRows[i]);
+ row.putValue(DIM_PASS_THROUGH_LONG, tempLongRows[i]);
+ row.putValue(DIM_LZ4_LONG, tempLongRows[i]);
+
+ // Dictionary columns
+ row.putValue(DIM_DICT_INTEGER, tempIntRows[i]);
+ row.putValue(DIM_DICT_LONG, tempLongRows[i]);
+ row.putValue(DIM_DICT_STRING, tempStringRows[i]);
+
+ rows.add(row);
+ }
+ return rows;
+ }
+
+ @Test
+ public void testComputeOperation()
+ throws Exception {
+ // Setup
+ SegmentMetadataImpl existingSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
+ SegmentDirectory segmentLocalFSDirectory =
+ new SegmentLocalFSDirectory(_segmentDirectory, existingSegmentMetadata, ReadMode.mmap);
+ SegmentDirectory.Writer writer = segmentLocalFSDirectory.createWriter();
+
+ // TEST1 : Validate with zero changes. ForwardIndexHandler should be a No-Op.
+ IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+ ForwardIndexHandler fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig);
+ Map<String, ForwardIndexHandler.Operation> operationMap = new HashMap<>();
+ operationMap = fwdIndexHandler.computeOperation(writer);
+ assertEquals(operationMap, Collections.EMPTY_MAP);
+
+ // TEST2: Enable dictionary for a RAW_ZSTANDARD_INDEX_COLUMN. ForwardIndexHandler should be a No-Op.
+ indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+ indexLoadingConfig.getNoDictionaryColumns().remove(DIM_ZSTANDARD_STRING);
+ fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig);
+ operationMap = fwdIndexHandler.computeOperation(writer);
+ assertEquals(operationMap, Collections.EMPTY_MAP);
+
+ // TEST3: Disable dictionary. ForwardIndexHandler should be a No-Op.
+ indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+ indexLoadingConfig.getNoDictionaryColumns().add(DIM_DICT_INTEGER);
+ fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig);
+ operationMap = fwdIndexHandler.computeOperation(writer);
+ assertEquals(operationMap, Collections.EMPTY_MAP);
+
+ // TEST4: Add random index. ForwardIndexHandler should be a No-Op.
+ indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+ indexLoadingConfig.getTextIndexColumns().add(DIM_DICT_INTEGER);
+ indexLoadingConfig.getTextIndexColumns().add(DIM_LZ4_INTEGER);
+ fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig);
+ operationMap = fwdIndexHandler.computeOperation(writer);
+ assertEquals(operationMap, Collections.EMPTY_MAP);
+
+ // TEST5: Change compression
+
+ // Create new tableConfig with the modified fieldConfigs.
+ List<FieldConfig> fieldConfigs = new ArrayList<>(_tableConfig.getFieldConfigList());
+ FieldConfig config = fieldConfigs.remove(0);
+ FieldConfig newConfig = new FieldConfig(config.getName(), FieldConfig.EncodingType.RAW, Collections.emptyList(),
+ FieldConfig.CompressionCodec.ZSTANDARD, null);
+ fieldConfigs.add(newConfig);
+ TableConfig tableConfig =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setNoDictionaryColumns(_noDictionaryColumns)
+ .setFieldConfigList(fieldConfigs).build();
+ tableConfig.setFieldConfigList(fieldConfigs);
+
+ indexLoadingConfig = new IndexLoadingConfig(null, tableConfig);
+ fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig);
+ operationMap = fwdIndexHandler.computeOperation(writer);
+ assertEquals(operationMap.size(), 1);
+ assertEquals(operationMap.get(config.getName()), ForwardIndexHandler.Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE);
+
+ // TEST6: Change compression and add index. Change compressionType for more than 1 column.
+ fieldConfigs = new ArrayList<>(_tableConfig.getFieldConfigList());
+ FieldConfig config1 = fieldConfigs.remove(0);
+ FieldConfig config2 = fieldConfigs.remove(1);
+
+ FieldConfig newConfig1 = new FieldConfig(config1.getName(), FieldConfig.EncodingType.RAW, Collections.emptyList(),
+ FieldConfig.CompressionCodec.ZSTANDARD, null);
+ fieldConfigs.add(newConfig1);
+ FieldConfig newConfig2 = new FieldConfig(config2.getName(), FieldConfig.EncodingType.RAW, Collections.emptyList(),
+ FieldConfig.CompressionCodec.ZSTANDARD, null);
+ fieldConfigs.add(newConfig2);
+
+ tableConfig =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setNoDictionaryColumns(_noDictionaryColumns)
+ .setFieldConfigList(fieldConfigs).build();
+ tableConfig.setFieldConfigList(fieldConfigs);
+
+ indexLoadingConfig = new IndexLoadingConfig(null, tableConfig);
+ indexLoadingConfig.getTextIndexColumns().add(config1.getName());
+ indexLoadingConfig.getInvertedIndexColumns().add(config1.getName());
+ fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig);
+ operationMap = fwdIndexHandler.computeOperation(writer);
+ assertEquals(operationMap.size(), 2);
+ assertEquals(operationMap.get(config1.getName()), ForwardIndexHandler.Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE);
+ assertEquals(operationMap.get(config2.getName()), ForwardIndexHandler.Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE);
+
+ // Tear down
+ segmentLocalFSDirectory.close();
+ }
+
+ @Test
+ public void testRewriteRawForwardIndexForSingleColumn()
+ throws Exception {
+ for (int i = 0; i < _noDictionaryColumns.size(); i++) {
+
+ // For every noDictionaryColumn, change the compressionType to all available types, one by one.
+ for (FieldConfig.CompressionCodec compressionType : _allCompressionTypes) {
+ // Setup
+ SegmentMetadataImpl existingSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
+ SegmentDirectory segmentLocalFSDirectory =
+ new SegmentLocalFSDirectory(_segmentDirectory, existingSegmentMetadata, ReadMode.mmap);
+ SegmentDirectory.Writer writer = segmentLocalFSDirectory.createWriter();
+
+ List<FieldConfig> fieldConfigs = new ArrayList<>(_tableConfig.getFieldConfigList());
+ FieldConfig config = fieldConfigs.remove(i);
+ String columnName = config.getName();
+ FieldConfig.CompressionCodec newCompressionType = compressionType;
+
+ FieldConfig newConfig =
+ new FieldConfig(columnName, FieldConfig.EncodingType.RAW, Collections.emptyList(), newCompressionType,
+ null);
+ fieldConfigs.add(newConfig);
+
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+ .setNoDictionaryColumns(_noDictionaryColumns).setFieldConfigList(fieldConfigs).build();
+ tableConfig.setFieldConfigList(fieldConfigs);
+
+ IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, tableConfig);
+ IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider();
+ ForwardIndexHandler fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig);
+
+ fwdIndexHandler.updateIndices(writer, indexCreatorProvider);
+
+ // Tear down before validation. Because columns.psf and index map cleanup happens at segmentDirectory.close()
+ segmentLocalFSDirectory.close();
+
+ // Validation
+ validateIndexMap();
+ validateForwardIndex(columnName, newCompressionType);
+ }
+ }
+ }
+
+ @Test
+ public void testRewriteRawForwardIndexForMultipleColumns()
+ throws Exception {
+ // Setup
+ SegmentMetadataImpl existingSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
+ SegmentDirectory segmentLocalFSDirectory =
+ new SegmentLocalFSDirectory(_segmentDirectory, existingSegmentMetadata, ReadMode.mmap);
+ SegmentDirectory.Writer writer = segmentLocalFSDirectory.createWriter();
+
+ List<FieldConfig> fieldConfigs = new ArrayList<>(_tableConfig.getFieldConfigList());
+ Random rand = new Random();
+ int randomIdx = rand.nextInt(_allCompressionTypes.size());
+ FieldConfig.CompressionCodec newCompressionType = _allCompressionTypes.get(randomIdx);
+
+ // Column 1
+ randomIdx = rand.nextInt(fieldConfigs.size());
+ FieldConfig config1 = fieldConfigs.remove(randomIdx);
+ String column1 = config1.getName();
+ FieldConfig newConfig1 =
+ new FieldConfig(column1, FieldConfig.EncodingType.RAW, Collections.emptyList(), newCompressionType, null);
+ fieldConfigs.add(newConfig1);
+
+ // Column 2
+ randomIdx = rand.nextInt(fieldConfigs.size());
+ FieldConfig config2 = fieldConfigs.remove(randomIdx);
+ String column2 = config2.getName();
+ FieldConfig newConfig2 =
+ new FieldConfig(column2, FieldConfig.EncodingType.RAW, Collections.emptyList(), newCompressionType, null);
+ fieldConfigs.add(newConfig2);
+
+ TableConfig tableConfig =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setNoDictionaryColumns(_noDictionaryColumns)
+ .setFieldConfigList(fieldConfigs).build();
+ tableConfig.setFieldConfigList(fieldConfigs);
+
+ IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, tableConfig);
+ IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider();
+ ForwardIndexHandler fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig);
+ fwdIndexHandler.updateIndices(writer, indexCreatorProvider);
+
+ // Tear down before validation. Because columns.psf and index map cleanup happens at segmentDirectory.close()
+ segmentLocalFSDirectory.close();
+
+ validateIndexMap();
+ validateForwardIndex(column1, newCompressionType);
+ validateForwardIndex(column2, newCompressionType);
+ }
+
+ private void validateForwardIndex(String columnName, FieldConfig.CompressionCodec expectedCompressionType)
+ throws IOException {
+ // Setup
+ SegmentMetadataImpl existingSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
+ SegmentDirectory segmentLocalFSDirectory =
+ new SegmentLocalFSDirectory(_segmentDirectory, existingSegmentMetadata, ReadMode.mmap);
+ SegmentDirectory.Writer writer = segmentLocalFSDirectory.createWriter();
+ ColumnMetadata columnMetadata = existingSegmentMetadata.getColumnMetadataFor(columnName);
+
+ // Check Compression type in header
+ ForwardIndexReader fwdIndexReader = LoaderUtils.getForwardIndexReader(writer, columnMetadata);
+ ChunkCompressionType fwdIndexCompressionType = fwdIndexReader.getCompressionType();
+ assertEquals(fwdIndexCompressionType.name(), expectedCompressionType.name());
+
+ try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(writer, columnMetadata)) {
+ PinotSegmentColumnReader columnReader =
+ new PinotSegmentColumnReader(forwardIndexReader, null, null, columnMetadata.getMaxNumberOfMultiValues());
+
+ for (int rowIdx = 0; rowIdx < columnMetadata.getTotalDocs(); rowIdx++) {
+ if (rowIdx % 10 == 0) {
+ Object val = columnReader.getValue(rowIdx);
+
+ FieldSpec.DataType dataType = forwardIndexReader.getStoredType();
+ if (dataType == FieldSpec.DataType.STRING) {
+ assertEquals((String) val, "testRow");
+ } else if (dataType == FieldSpec.DataType.INT) {
+ assertEquals((int) val, 1001, columnName + " " + rowIdx + " " + expectedCompressionType);
+ } else if (dataType == FieldSpec.DataType.LONG) {
+ assertEquals((long) val, 1001L, columnName + " " + rowIdx + " " + expectedCompressionType);
+ }
+ }
+ }
+ }
+ }
+
+ private void validateIndexMap()
+ throws IOException {
+ // Panic validation to make sure all columns have only one forward index entry in index map.
+ for (String columnName : _noDictionaryColumns) {
+ String segmentDir = INDEX_DIR + "/" + SEGMENT_NAME + "/v3";
+ File idxMapFile = new File(segmentDir, V1Constants.INDEX_MAP_FILE_NAME);
+ String indexMapStr = FileUtils.readFileToString(idxMapFile, StandardCharsets.UTF_8);
+ assertEquals(StringUtils.countMatches(indexMapStr, columnName + ".forward_index" + ".startOffset"), 1);
+ assertEquals(StringUtils.countMatches(indexMapStr, columnName + ".forward_index" + ".size"), 1);
+ }
+ }
+}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
index b4de2f5e22..2bcad84f23 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
@@ -43,11 +43,13 @@ import org.apache.pinot.segment.local.segment.index.loader.columnminmaxvalue.Col
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.creator.H3IndexConfig;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
@@ -308,6 +310,60 @@ public class SegmentPreProcessorTest {
checkFSTIndexCreation(EXISTING_STRING_COL_DICT, 9, 4, _newColumnsSchemaWithFST, false, false, 26);
}
+ @Test
+ public void testForwardIndexHandler()
+ throws Exception {
+ Map<String, ChunkCompressionType> compressionConfigs = new HashMap<>();
+ ChunkCompressionType newCompressionType = ChunkCompressionType.ZSTANDARD;
+ compressionConfigs.put(EXISTING_STRING_COL_RAW, newCompressionType);
+ _indexLoadingConfig.setCompressionConfigs(compressionConfigs);
+ _indexLoadingConfig.setNoDictionaryColumns(new HashSet<String>() {{
+ add(EXISTING_STRING_COL_RAW);
+ }});
+
+ // Test1: Rewriting forward index will be a no-op for v1 segments. Default LZ4 compressionType will be retained.
+ constructV1Segment();
+ checkForwardIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, ChunkCompressionType.LZ4);
+
+ // Convert the segment to V3.
+ new SegmentV1V2ToV3FormatConverter().convert(_indexDir);
+
+ // Test2: Now forward index will be rewritten with ZSTANDARD compressionType.
+ checkForwardIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, newCompressionType);
+
+ // Test3: Change compression on existing raw index column. Also add text index on same column. Check correctness.
+ newCompressionType = ChunkCompressionType.SNAPPY;
+ compressionConfigs.put(EXISTING_STRING_COL_RAW, newCompressionType);
+ Set<String> textIndexColumns = new HashSet<>();
+ textIndexColumns.add(EXISTING_STRING_COL_RAW);
+ _indexLoadingConfig.setTextIndexColumns(textIndexColumns);
+
+ constructV3Segment();
+ SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
+ ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(EXISTING_STRING_COL_RAW);
+ assertNotNull(columnMetadata);
+ checkTextIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0);
+ validateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, true,
+ 0, newCompressionType);
+
+ // Test4: Change compression on RAW index column. Change another index on another column. Check correctness.
+ newCompressionType = ChunkCompressionType.ZSTANDARD;
+ compressionConfigs.put(EXISTING_STRING_COL_RAW, newCompressionType);
+ Set<String> fstColumns = new HashSet<>();
+ fstColumns.add(EXISTING_STRING_COL_DICT);
+ _indexLoadingConfig.setFSTIndexColumns(fstColumns);
+
+ constructV3Segment();
+ segmentMetadata = new SegmentMetadataImpl(_indexDir);
+ columnMetadata = segmentMetadata.getColumnMetadataFor(EXISTING_STRING_COL_DICT);
+ assertNotNull(columnMetadata);
+ // Check FST index
+ checkFSTIndexCreation(EXISTING_STRING_COL_DICT, 9, 4, _newColumnsSchemaWithFST, false, false, 26);
+ // Check forward index.
+ validateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, true,
+ 0, newCompressionType);
+ }
+
/**
* Test to check for default column handling and text index creation during
* segment load after a new dictionary encoded column is added to the schema
@@ -406,28 +462,36 @@ public class SegmentPreProcessorTest {
private void checkFSTIndexCreation(String column, int cardinality, int bits, Schema schema, boolean isAutoGenerated,
boolean isSorted, int dictionaryElementSize)
throws Exception {
- checkIndexCreation(ColumnIndexType.FST_INDEX, column, cardinality, bits, schema, isAutoGenerated, true, isSorted,
- dictionaryElementSize, true, 0);
+ createAndValidateIndex(ColumnIndexType.FST_INDEX, column, cardinality, bits, schema, isAutoGenerated, true,
+ isSorted, dictionaryElementSize, true, 0, null);
}
private void checkTextIndexCreation(String column, int cardinality, int bits, Schema schema, boolean isAutoGenerated,
boolean hasDictionary, boolean isSorted, int dictionaryElementSize)
throws Exception {
- checkIndexCreation(ColumnIndexType.TEXT_INDEX, column, cardinality, bits, schema, isAutoGenerated, hasDictionary,
- isSorted, dictionaryElementSize, true, 0);
+ createAndValidateIndex(ColumnIndexType.TEXT_INDEX, column, cardinality, bits, schema, isAutoGenerated,
+ hasDictionary, isSorted, dictionaryElementSize, true, 0, null);
}
private void checkTextIndexCreation(String column, int cardinality, int bits, Schema schema, boolean isAutoGenerated,
boolean hasDictionary, boolean isSorted, int dictionaryElementSize, boolean isSingleValue,
int maxNumberOfMultiValues)
throws Exception {
- checkIndexCreation(ColumnIndexType.TEXT_INDEX, column, cardinality, bits, schema, isAutoGenerated, hasDictionary,
- isSorted, dictionaryElementSize, isSingleValue, maxNumberOfMultiValues);
+ createAndValidateIndex(ColumnIndexType.TEXT_INDEX, column, cardinality, bits, schema, isAutoGenerated,
+ hasDictionary, isSorted, dictionaryElementSize, isSingleValue, maxNumberOfMultiValues, null);
}
- private void checkIndexCreation(ColumnIndexType indexType, String column, int cardinality, int bits, Schema schema,
+ private void checkForwardIndexCreation(String column, int cardinality, int bits, Schema schema,
boolean isAutoGenerated, boolean hasDictionary, boolean isSorted, int dictionaryElementSize,
- boolean isSingleValued, int maxNumberOfMultiValues)
+ ChunkCompressionType expectedCompressionType)
+ throws Exception {
+ createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, column, cardinality, bits, schema, isAutoGenerated,
+ hasDictionary, isSorted, dictionaryElementSize, true, 0, expectedCompressionType);
+ }
+
+ private void createAndValidateIndex(ColumnIndexType indexType, String column, int cardinality, int bits,
+ Schema schema, boolean isAutoGenerated, boolean hasDictionary, boolean isSorted, int dictionaryElementSize,
+ boolean isSingleValued, int maxNumberOfMultiValues, ChunkCompressionType expectedCompressionType)
throws Exception {
try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
@@ -435,33 +499,59 @@ public class SegmentPreProcessorTest {
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, schema)) {
processor.process();
- SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
- ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column);
- assertEquals(columnMetadata.getFieldSpec(), new DimensionFieldSpec(column, DataType.STRING, isSingleValued));
- assertEquals(columnMetadata.getCardinality(), cardinality);
- assertEquals(columnMetadata.getTotalDocs(), 100000);
- assertEquals(columnMetadata.getBitsPerElement(), bits);
- assertEquals(columnMetadata.getColumnMaxLength(), dictionaryElementSize);
- assertEquals(columnMetadata.isSorted(), isSorted);
- assertEquals(columnMetadata.hasDictionary(), hasDictionary);
- assertEquals(columnMetadata.getMaxNumberOfMultiValues(), maxNumberOfMultiValues);
- assertEquals(columnMetadata.getTotalNumberOfEntries(), 100000);
- assertEquals(columnMetadata.isAutoGenerated(), isAutoGenerated);
-
- try (SegmentDirectory segmentDirectory1 = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
- .load(_indexDir.toURI(),
- new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
- SegmentDirectory.Reader reader = segmentDirectory1.createReader()) {
- assertTrue(reader.hasIndexFor(column, indexType));
- assertTrue(reader.hasIndexFor(column, ColumnIndexType.FORWARD_INDEX));
- // if the text index is enabled on a new column with dictionary,
- // then dictionary should be created by the default column handler
- if (hasDictionary) {
- assertTrue(reader.hasIndexFor(column, ColumnIndexType.DICTIONARY));
+ validateIndex(indexType, column, cardinality, bits, schema, isAutoGenerated, hasDictionary, isSorted,
+ dictionaryElementSize, isSingleValued, maxNumberOfMultiValues, expectedCompressionType);
+ }
+ }
+
+ private void validateIndex(ColumnIndexType indexType, String column, int cardinality, int bits, Schema schema,
+ boolean isAutoGenerated, boolean hasDictionary, boolean isSorted, int dictionaryElementSize,
+ boolean isSingleValued, int maxNumberOfMultiValues, ChunkCompressionType expectedCompressionType)
+ throws Exception {
+ SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
+ ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column);
+ assertEquals(columnMetadata.getFieldSpec(), new DimensionFieldSpec(column, DataType.STRING, isSingleValued));
+ assertEquals(columnMetadata.getCardinality(), cardinality);
+ assertEquals(columnMetadata.getTotalDocs(), 100000);
+ assertEquals(columnMetadata.getBitsPerElement(), bits);
+ assertEquals(columnMetadata.getColumnMaxLength(), dictionaryElementSize);
+ assertEquals(columnMetadata.isSorted(), isSorted);
+ assertEquals(columnMetadata.hasDictionary(), hasDictionary);
+ assertEquals(columnMetadata.getMaxNumberOfMultiValues(), maxNumberOfMultiValues);
+ assertEquals(columnMetadata.getTotalNumberOfEntries(), 100000);
+ assertEquals(columnMetadata.isAutoGenerated(), isAutoGenerated);
+
+ try (SegmentDirectory segmentDirectory1 = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
+ .load(_indexDir.toURI(),
+ new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
+ SegmentDirectory.Reader reader = segmentDirectory1.createReader()) {
+ assertTrue(reader.hasIndexFor(column, indexType));
+ assertTrue(reader.hasIndexFor(column, ColumnIndexType.FORWARD_INDEX));
+
+ // Check if the raw forward index compressionType is correct.
+ if (expectedCompressionType != null) {
+ try (ForwardIndexReader fwdIndexReader = LoaderUtils.getForwardIndexReader(reader, columnMetadata)) {
+ ChunkCompressionType compressionType = fwdIndexReader.getCompressionType();
+ assertTrue(compressionType.equals(expectedCompressionType), compressionType.toString());
+ }
+
+ File inProgressFile = new File(_indexDir, column + ".fwd.inprogress");
+ assertTrue(!inProgressFile.exists());
+ File v1FwdIndexFile = new File(_indexDir, column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
+ if (segmentMetadata.getVersion() == SegmentVersion.v3) {
+ assertTrue(!v1FwdIndexFile.exists());
} else {
- assertFalse(reader.hasIndexFor(column, ColumnIndexType.DICTIONARY));
+ assertTrue(v1FwdIndexFile.exists());
}
}
+
+ // if the text index is enabled on a new column with dictionary,
+ // then dictionary should be created by the default column handler
+ if (hasDictionary) {
+ assertTrue(reader.hasIndexFor(column, ColumnIndexType.DICTIONARY));
+ } else {
+ assertFalse(reader.hasIndexFor(column, ColumnIndexType.DICTIONARY));
+ }
}
}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
index 3ffa80ce97..86331a28ae 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.spi.index.reader;
import java.io.Closeable;
import java.math.BigDecimal;
import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.utils.BigDecimalUtils;
@@ -49,6 +50,24 @@ public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends
*/
DataType getStoredType();
+ /**
+ * Returns the compression type (if valid). Only valid for RAW forward index columns implemented in
+ * BaseChunkForwardIndexReader.
+ * @return
+ */
+ default ChunkCompressionType getCompressionType() {
+ return null;
+ }
+
+ /**
+ * Returns the length of the longest entry. Only valid for RAW forward index columns implemented in
+ * BaseChunkForwardIndexReader. Returns -1 otherwise.
+ * @return
+ */
+ default int getLengthOfLongestEntry() {
+ return -1;
+ }
+
/**
* Creates a new {@link ForwardIndexReaderContext} of the reader which can be used to accelerate the reads.
* NOTE: Caller is responsible for closing the returned reader context.
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexType.java
index 2650fb6bd5..37c9d608af 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexType.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexType.java
@@ -18,6 +18,9 @@
*/
package org.apache.pinot.segment.spi.store;
+// Note: It is required to keep DICTIONARY and FORWARD_INDEX as the starting entries in this enum to ensure
+// correctness during preprocessing a segment during segmentReload. Please add new entries at the end (or at least
+// below DICTIONARY and FORWARD_INDEX).
public enum ColumnIndexType {
DICTIONARY("dictionary"),
FORWARD_INDEX("forward_index"),
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org