You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/11/28 23:40:47 UTC
(pinot) branch master updated: Add a new MV forward index to only store unique MV values (#11993)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d654cc9db2 Add a new MV forward index to only store unique MV values (#11993)
d654cc9db2 is described below
commit d654cc9db2c8561d8b7856c316c6a00c73cc7ff5
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Nov 28 15:40:42 2023 -0800
Add a new MV forward index to only store unique MV values (#11993)
---
.../pinot/core/startree/v2/BaseStarTreeV2Test.java | 7 +-
.../tests/OfflineClusterIntegrationTest.java | 48 +++-
.../local/io/util/FixedBitIntReaderWriter.java | 8 +
.../segment/local/io/util/PinotDataBitSet.java | 6 +
.../FixedBitMVEntryDictForwardIndexWriter.java | 126 +++++++++
.../MultiValueEntryDictForwardIndexCreator.java | 67 +++++
.../index/forward/ForwardIndexCreatorFactory.java | 10 +-
.../index/forward/ForwardIndexReaderFactory.java | 11 +-
.../segment/index/forward/ForwardIndexType.java | 78 ++----
.../segment/index/loader/ForwardIndexHandler.java | 284 ++++++++++-----------
.../segment/index/loader/IndexLoadingConfig.java | 15 +-
.../defaultcolumn/BaseDefaultColumnHandler.java | 19 +-
.../FixedBitMVEntryDictForwardIndexReader.java | 155 +++++++++++
.../segment/local/utils/TableConfigUtils.java | 106 ++++----
.../FixedBitMVEntryDictForwardIndexTest.java | 123 +++++++++
.../index/forward/ForwardIndexTypeTest.java | 56 ++--
.../index/loader/ForwardIndexHandlerTest.java | 151 ++++++++---
.../index/loader/SegmentPreProcessorTest.java | 27 +-
.../segment/local/utils/TableConfigUtilsTest.java | 60 +++--
.../spi/compression/DictIdCompressionType.java | 47 ++++
.../segment/spi/index/ForwardIndexConfig.java | 81 ++++--
.../spi/index/reader/ForwardIndexReader.java | 11 +-
.../apache/pinot/spi/config/table/FieldConfig.java | 24 +-
23 files changed, 1121 insertions(+), 399 deletions(-)
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
index 3787363d17..63cc2c2bda 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
@@ -462,7 +462,12 @@ abstract class BaseStarTreeV2Test<R, A> {
*/
CompressionCodec getCompressionCodec() {
CompressionCodec[] compressionCodecs = CompressionCodec.values();
- return compressionCodecs[RANDOM.nextInt(compressionCodecs.length)];
+ while (true) {
+ CompressionCodec compressionCodec = compressionCodecs[RANDOM.nextInt(compressionCodecs.length)];
+ if (compressionCodec.isApplicableToRawIndex()) {
+ return compressionCodec;
+ }
+ }
}
abstract ValueAggregator<R, A> getValueAggregator();
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 824991d86f..86bafbd2f0 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -64,6 +64,8 @@ import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator;
import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.QueryConfig;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
@@ -154,7 +156,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
private static final String COLUMN_LENGTH_MAP_KEY = "columnLengthMap";
private static final String COLUMN_CARDINALITY_MAP_KEY = "columnCardinalityMap";
private static final String MAX_NUM_MULTI_VALUES_MAP_KEY = "maxNumMultiValuesMap";
- private static final int DISK_SIZE_IN_BYTES = 20798784;
+ private static final int DISK_SIZE_IN_BYTES = 20277762;
private static final int NUM_ROWS = 115545;
private final List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbacks =
@@ -178,6 +180,13 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
return _schemaFileName;
}
+ @Override
+ protected List<FieldConfig> getFieldConfigs() {
+ return Collections.singletonList(
+ new FieldConfig("DivAirports", FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(),
+ CompressionCodec.MV_ENTRY_DICT, null));
+ }
+
@BeforeClass
public void setUp()
throws Exception {
@@ -645,7 +654,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
}
@Test
- public void testMaxQueryResponseSizeTableConfig() throws Exception {
+ public void testMaxQueryResponseSizeTableConfig()
+ throws Exception {
TableConfig tableConfig = getOfflineTableConfig();
tableConfig.setQueryConfig(new QueryConfig(null, false, null, null, 1000L, null));
updateTableConfig(tableConfig);
@@ -677,7 +687,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
}
@Test
- public void testMaxServerResponseSizeTableConfig() throws Exception {
+ public void testMaxServerResponseSizeTableConfig()
+ throws Exception {
TableConfig tableConfig = getOfflineTableConfig();
tableConfig.setQueryConfig(new QueryConfig(null, false, null, null, null, 1000L));
updateTableConfig(tableConfig);
@@ -709,7 +720,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
}
@Test
- public void testMaxResponseSizeTableConfigOrdering() throws Exception {
+ public void testMaxResponseSizeTableConfigOrdering()
+ throws Exception {
TableConfig tableConfig = getOfflineTableConfig();
tableConfig.setQueryConfig(new QueryConfig(null, false, null, null, 1000000L, 1000L));
updateTableConfig(tableConfig);
@@ -1569,12 +1581,39 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setTransformConfigs(transformConfigs);
tableConfig.setIngestionConfig(ingestionConfig);
+ List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
+ assertNotNull(fieldConfigList);
+ fieldConfigList.add(
+ new FieldConfig("NewAddedDerivedDivAirportSeqIDs", FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(),
+ CompressionCodec.MV_ENTRY_DICT, null));
+ fieldConfigList.add(new FieldConfig("NewAddedDerivedDivAirportSeqIDsString", FieldConfig.EncodingType.DICTIONARY,
+ Collections.emptyList(), CompressionCodec.MV_ENTRY_DICT, null));
updateTableConfig(tableConfig);
// Trigger reload
reloadAllSegments(TEST_EXTRA_COLUMNS_QUERY, false, numTotalDocs);
assertEquals(postQuery(TEST_EXTRA_COLUMNS_QUERY).get("resultTable").get("rows").get(0).get(0).asLong(),
numTotalDocs);
+
+ // Verify the index sizes
+ JsonNode columnIndexSizeMap = JsonUtils.stringToJsonNode(sendGetRequest(
+ getControllerBaseApiUrl() + "/tables/mytable/metadata?columns=DivAirportSeqIDs"
+ + "&columns=NewAddedDerivedDivAirportSeqIDs&columns=NewAddedDerivedDivAirportSeqIDsString"))
+ .get("columnIndexSizeMap");
+ assertEquals(columnIndexSizeMap.size(), 3);
+ JsonNode originalColumnIndexSizes = columnIndexSizeMap.get("DivAirportSeqIDs");
+ JsonNode derivedColumnIndexSizes = columnIndexSizeMap.get("NewAddedDerivedDivAirportSeqIDs");
+ JsonNode derivedStringColumnIndexSizes = columnIndexSizeMap.get("NewAddedDerivedDivAirportSeqIDsString");
+
+ // Derived int column should have the same dictionary size as the original column
+ double originalColumnDictionarySize = originalColumnIndexSizes.get("dictionary").asDouble();
+ assertEquals(derivedColumnIndexSizes.get("dictionary").asDouble(), originalColumnDictionarySize);
+ // Derived string column should have larger dictionary size than the original column
+ assertTrue(derivedStringColumnIndexSizes.get("dictionary").asDouble() > originalColumnDictionarySize);
+ // Both derived columns should have smaller forward index size than the original column because of compression
+ double derivedColumnForwardIndexSize = derivedColumnIndexSizes.get("forward_index").asDouble();
+ assertTrue(derivedColumnForwardIndexSize < originalColumnIndexSizes.get("forward_index").asDouble());
+ assertEquals(derivedStringColumnIndexSizes.get("forward_index").asDouble(), derivedColumnForwardIndexSize);
}
private void reloadWithMissingColumns()
@@ -1582,6 +1621,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
// Remove columns from the table config first to pass the validation of the table config
TableConfig tableConfig = getOfflineTableConfig();
tableConfig.setIngestionConfig(null);
+ tableConfig.setFieldConfigList(getFieldConfigs());
updateTableConfig(tableConfig);
// Need to first delete then add the schema because removing columns is backward-incompatible change
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/FixedBitIntReaderWriter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/FixedBitIntReaderWriter.java
index 76e1a154c8..3ef165740f 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/FixedBitIntReaderWriter.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/FixedBitIntReaderWriter.java
@@ -50,6 +50,14 @@ public final class FixedBitIntReaderWriter implements Closeable {
_dataBitSet.writeInt(startIndex, _numBitsPerValue, length, values);
}
+ public int getStartByteOffset(int index) {
+ return (int) (((long) index * _numBitsPerValue) / Byte.SIZE);
+ }
+
+ public int getEndByteOffset(int index) {
+ return (int) (((long) (index + 1) * _numBitsPerValue - 1) / Byte.SIZE) + 1;
+ }
+
@Override
public void close() {
_dataBitSet.close();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/PinotDataBitSet.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/PinotDataBitSet.java
index 4ed84a2c0c..a42665e941 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/PinotDataBitSet.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/PinotDataBitSet.java
@@ -102,6 +102,9 @@ public final class PinotDataBitSet implements Closeable {
}
public void readInt(int startIndex, int numBitsPerValue, int length, int[] buffer) {
+ if (length == 0) {
+ return;
+ }
long startBitOffset = (long) startIndex * numBitsPerValue;
int byteOffset = (int) (startBitOffset / Byte.SIZE);
int bitOffsetInFirstByte = (int) (startBitOffset % Byte.SIZE);
@@ -167,6 +170,9 @@ public final class PinotDataBitSet implements Closeable {
}
public void writeInt(int startIndex, int numBitsPerValue, int length, int[] values) {
+ if (length == 0) {
+ return;
+ }
long startBitOffset = (long) startIndex * numBitsPerValue;
int byteOffset = (int) (startBitOffset / Byte.SIZE);
int bitOffsetInFirstByte = (int) (startBitOffset % Byte.SIZE);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedBitMVEntryDictForwardIndexWriter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedBitMVEntryDictForwardIndexWriter.java
new file mode 100644
index 0000000000..80a9a3dfa3
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedBitMVEntryDictForwardIndexWriter.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.writer.impl;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteOrder;
+import org.apache.pinot.segment.local.io.util.FixedBitIntReaderWriter;
+import org.apache.pinot.segment.local.io.util.PinotDataBitSet;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+
+
+/**
+ * Bit-compressed dictionary-encoded forward index writer for multi-value columns, where a second level dictionary
+ * encoding for multi-value entries (instead of individual values within the entry) are maintained within the forward
+ * index.
+ *
+ * Index layout:
+ * - Index header (24 bytes)
+ * - ID buffer (stores the multi-value entry id for each doc id)
+ * - Offset buffer (stores the start offset of each multi-value entry, followed by end offset of the last value)
+ * - Value buffer (stores the individual values)
+ *
+ * Header layout:
+ * - Magic marker (4 bytes)
+ * - Version (2 bytes)
+ * - Bits per value (1 byte)
+ * - Bits per id (1 byte)
+ * - Number of unique MV entries (4 bytes)
+ * - Number of total values (4 bytes)
+ * - Start offset of offset buffer (4 bytes)
+ * - Start offset of value buffer (4 bytes)
+ */
+public class FixedBitMVEntryDictForwardIndexWriter implements Closeable {
+ public static final int MAGIC_MARKER = 0xffabcdef;
+ public static final short VERSION = 1;
+ public static final int HEADER_SIZE = 24;
+
+ private final Object2IntOpenHashMap<IntArrayList> _entryToIdMap = new Object2IntOpenHashMap<>();
+ private final File _file;
+ private final int _numBitsPerValue;
+ private final IntArrayList _ids;
+
+ public FixedBitMVEntryDictForwardIndexWriter(File file, int numDocs, int numBitsPerValue) {
+ _file = file;
+ _numBitsPerValue = numBitsPerValue;
+ _ids = new IntArrayList(numDocs);
+ }
+
+ public void putDictIds(int[] dictIds) {
+ // Lookup the map, and create a new id when the entry is not found.
+ _ids.add(_entryToIdMap.computeIntIfAbsent(IntArrayList.wrap(dictIds), k -> _entryToIdMap.size()));
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ int numUniqueEntries = _entryToIdMap.size();
+ int[][] idToDictIdsMap = new int[numUniqueEntries][];
+ int numTotalValues = 0;
+ for (Object2IntMap.Entry<IntArrayList> entry : _entryToIdMap.object2IntEntrySet()) {
+ int id = entry.getIntValue();
+ int[] dictIds = entry.getKey().elements();
+ idToDictIdsMap[id] = dictIds;
+ numTotalValues += dictIds.length;
+ }
+ int[] ids = _ids.elements();
+ int numBitsPerId = PinotDataBitSet.getNumBitsPerValue(numUniqueEntries - 1);
+ int idBufferSize = (int) (((long) ids.length * numBitsPerId + 7) / 8);
+ int numBitsPerOffset = PinotDataBitSet.getNumBitsPerValue(numTotalValues);
+ int offsetBufferSize = (int) (((long) (numUniqueEntries + 1) * numBitsPerOffset + 7) / 8);
+ int valueBufferSize = (int) (((long) numTotalValues * _numBitsPerValue + 7) / 8);
+ int offsetBufferOffset = HEADER_SIZE + idBufferSize;
+ int valueBufferOffset = offsetBufferOffset + offsetBufferSize;
+ int indexSize = valueBufferOffset + valueBufferSize;
+ try (PinotDataBuffer indexBuffer = PinotDataBuffer.mapFile(_file, false, 0, indexSize, ByteOrder.BIG_ENDIAN,
+ getClass().getSimpleName())) {
+ indexBuffer.putInt(0, MAGIC_MARKER);
+ indexBuffer.putShort(4, VERSION);
+ indexBuffer.putByte(6, (byte) _numBitsPerValue);
+ indexBuffer.putByte(7, (byte) numBitsPerId);
+ indexBuffer.putInt(8, numUniqueEntries);
+ indexBuffer.putInt(12, numTotalValues);
+ indexBuffer.putInt(16, offsetBufferOffset);
+ indexBuffer.putInt(20, valueBufferOffset);
+
+ try (FixedBitIntReaderWriter idWriter = new FixedBitIntReaderWriter(
+ indexBuffer.view(HEADER_SIZE, offsetBufferOffset), ids.length, numBitsPerId)) {
+ idWriter.writeInt(0, ids.length, ids);
+ }
+ try (FixedBitIntReaderWriter offsetWriter = new FixedBitIntReaderWriter(
+ indexBuffer.view(offsetBufferOffset, valueBufferOffset), numUniqueEntries + 1, numBitsPerOffset);
+ FixedBitIntReaderWriter valueWriter = new FixedBitIntReaderWriter(
+ indexBuffer.view(valueBufferOffset, indexSize), numTotalValues, _numBitsPerValue)) {
+ int startOffset = 0;
+ for (int i = 0; i < numUniqueEntries; i++) {
+ offsetWriter.writeInt(i, startOffset);
+ int[] dictIds = idToDictIdsMap[i];
+ valueWriter.writeInt(startOffset, dictIds.length, dictIds);
+ startOffset += dictIds.length;
+ }
+ offsetWriter.writeInt(numUniqueEntries, startOffset);
+ }
+ }
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueEntryDictForwardIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueEntryDictForwardIndexCreator.java
new file mode 100644
index 0000000000..2db269f038
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueEntryDictForwardIndexCreator.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.fwd;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.pinot.segment.local.io.util.PinotDataBitSet;
+import org.apache.pinot.segment.local.io.writer.impl.FixedBitMVEntryDictForwardIndexWriter;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * Forward index creator for dictionary-encoded multi-value column, where multi-value entries are dictionary encoded.
+ */
+public class MultiValueEntryDictForwardIndexCreator implements ForwardIndexCreator {
+ private final FixedBitMVEntryDictForwardIndexWriter _writer;
+
+ public MultiValueEntryDictForwardIndexCreator(File outputDir, String column, int cardinality, int numDocs) {
+ File indexFile = new File(outputDir, column + V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION);
+ int numBitsPerValue = PinotDataBitSet.getNumBitsPerValue(cardinality - 1);
+ _writer = new FixedBitMVEntryDictForwardIndexWriter(indexFile, numDocs, numBitsPerValue);
+ }
+
+ @Override
+ public boolean isDictionaryEncoded() {
+ return true;
+ }
+
+ @Override
+ public boolean isSingleValue() {
+ return false;
+ }
+
+ @Override
+ public DataType getValueType() {
+ return DataType.INT;
+ }
+
+ @Override
+ public void putDictIdMV(int[] dictIds) {
+ _writer.putDictIds(dictIds);
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ _writer.close();
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java
index a57158b44f..ded5000b0c 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.segment.index.forward;
import java.io.File;
import java.io.IOException;
+import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueEntryDictForwardIndexCreator;
import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator;
import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator;
import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
@@ -29,6 +30,7 @@ import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSorted
import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator;
import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.DictIdCompressionType;
import org.apache.pinot.segment.spi.creator.IndexCreationContext;
import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
@@ -57,8 +59,12 @@ public class ForwardIndexCreatorFactory {
return new SingleValueUnsortedForwardIndexCreator(indexDir, columnName, cardinality, numTotalDocs);
}
} else {
- return new MultiValueUnsortedForwardIndexCreator(indexDir, columnName, cardinality, numTotalDocs,
- context.getTotalNumberOfEntries());
+ if (indexConfig.getDictIdCompressionType() == DictIdCompressionType.MV_ENTRY_DICT) {
+ return new MultiValueEntryDictForwardIndexCreator(indexDir, columnName, cardinality, numTotalDocs);
+ } else {
+ return new MultiValueUnsortedForwardIndexCreator(indexDir, columnName, cardinality, numTotalDocs,
+ context.getTotalNumberOfEntries());
+ }
}
} else {
// Dictionary disabled columns
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
index 8408a9026d..b5ca2b83c1 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
@@ -20,6 +20,7 @@
package org.apache.pinot.segment.local.segment.index.forward;
import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
+import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVEntryDictForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2;
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader;
@@ -72,8 +73,14 @@ public class ForwardIndexReaderFactory extends IndexReaderFactory.Default<Forwar
return new FixedBitSVForwardIndexReaderV2(dataBuffer, metadata.getTotalDocs(), metadata.getBitsPerElement());
}
} else {
- return new FixedBitMVForwardIndexReader(dataBuffer, metadata.getTotalDocs(), metadata.getTotalNumberOfEntries(),
- metadata.getBitsPerElement());
+ if (dataBuffer.size() > Integer.BYTES
+ && dataBuffer.getInt(0) == FixedBitMVEntryDictForwardIndexReader.MAGIC_MARKER) {
+ return new FixedBitMVEntryDictForwardIndexReader(dataBuffer, metadata.getTotalDocs(),
+ metadata.getBitsPerElement());
+ } else {
+ return new FixedBitMVForwardIndexReader(dataBuffer, metadata.getTotalDocs(),
+ metadata.getTotalNumberOfEntries(), metadata.getBitsPerElement());
+ }
}
} else {
return createRawIndexReader(dataBuffer, metadata.getDataType().getStoredType(), metadata.isSingleValue());
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
index 369341be6c..a454bf9cfb 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
@@ -21,7 +21,6 @@ package org.apache.pinot.segment.local.segment.index.forward;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
@@ -29,7 +28,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.pinot.segment.local.realtime.impl.forward.FixedByteMVMutableForwardIndex;
import org.apache.pinot.segment.local.realtime.impl.forward.FixedByteSVMutableForwardIndex;
@@ -59,6 +57,7 @@ import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
@@ -91,54 +90,29 @@ public class ForwardIndexType extends AbstractIndexType<ForwardIndexConfig, Forw
@Override
public Map<String, ForwardIndexConfig> fromIndexLoadingConfig(IndexLoadingConfig indexLoadingConfig) {
- Set<String> disabledCols = indexLoadingConfig.getForwardIndexDisabledColumns();
+ Set<String> disabledColumns = indexLoadingConfig.getForwardIndexDisabledColumns();
+ Map<String, CompressionCodec> compressionCodecMap = indexLoadingConfig.getCompressionConfigs();
Map<String, ForwardIndexConfig> result = new HashMap<>();
- Set<String> allColumns = Sets.union(disabledCols, indexLoadingConfig.getAllKnownColumns());
- for (String column : allColumns) {
- ChunkCompressionType compressionType =
- indexLoadingConfig.getCompressionConfigs() != null
- ? indexLoadingConfig.getCompressionConfigs().get(column)
- : null;
- Supplier<ForwardIndexConfig> defaultConfig = () -> {
- if (compressionType == null) {
- return ForwardIndexConfig.DEFAULT;
- } else {
- return new ForwardIndexConfig.Builder().withCompressionType(compressionType).build();
- }
- };
- if (!disabledCols.contains(column)) {
- TableConfig tableConfig = indexLoadingConfig.getTableConfig();
- if (tableConfig == null) {
- result.put(column, defaultConfig.get());
- } else {
- List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
- if (fieldConfigList == null) {
- result.put(column, defaultConfig.get());
- continue;
- }
- FieldConfig fieldConfig = fieldConfigList.stream()
- .filter(fc -> fc.getName().equals(column))
- .findAny()
- .orElse(null);
- if (fieldConfig == null) {
- result.put(column, defaultConfig.get());
- continue;
- }
- ForwardIndexConfig.Builder builder = new ForwardIndexConfig.Builder();
- if (compressionType != null) {
- builder.withCompressionType(compressionType);
- } else {
- FieldConfig.CompressionCodec compressionCodec = fieldConfig.getCompressionCodec();
- if (compressionCodec != null) {
- builder.withCompressionType(ChunkCompressionType.valueOf(compressionCodec.name()));
+ for (String column : indexLoadingConfig.getAllKnownColumns()) {
+ ForwardIndexConfig forwardIndexConfig;
+ if (!disabledColumns.contains(column)) {
+ CompressionCodec compressionCodec = compressionCodecMap.get(column);
+ if (compressionCodec == null) {
+ TableConfig tableConfig = indexLoadingConfig.getTableConfig();
+ if (tableConfig != null && tableConfig.getFieldConfigList() != null) {
+ FieldConfig fieldConfig =
+ tableConfig.getFieldConfigList().stream().filter(fc -> fc.getName().equals(column)).findAny()
+ .orElse(null);
+ if (fieldConfig != null) {
+ compressionCodec = fieldConfig.getCompressionCodec();
}
}
-
- result.put(column, builder.build());
}
+ forwardIndexConfig = new ForwardIndexConfig.Builder().withCompressionCodec(compressionCodec).build();
} else {
- result.put(column, ForwardIndexConfig.DISABLED);
+ forwardIndexConfig = ForwardIndexConfig.DISABLED;
}
+ result.put(column, forwardIndexConfig);
}
return result;
}
@@ -169,13 +143,10 @@ public class ForwardIndexType extends AbstractIndexType<ForwardIndexConfig, Forw
if (properties != null && isDisabled(properties)) {
fwdConfig.put(fieldConfig.getName(), ForwardIndexConfig.DISABLED);
} else {
- DictionaryIndexConfig dictConfig = dictConfigs.get(fieldConfig.getName());
- if (dictConfig != null && dictConfig.isDisabled()) {
- fwdConfig.put(fieldConfig.getName(), createConfigFromFieldConfig(fieldConfig));
+ ForwardIndexConfig config = createConfigFromFieldConfig(fieldConfig);
+ if (!config.equals(ForwardIndexConfig.DEFAULT)) {
+ fwdConfig.put(fieldConfig.getName(), config);
}
- // On other case encoding is DICTIONARY. We create the default forward index by default. That means that if
- // field config indicates for example a compression while using encoding dictionary, the compression is
- // ignored.
// It is important to do not explicitly add the default value here in order to avoid exclusive problems with
// the default `fromIndexes` deserializer.
}
@@ -193,17 +164,12 @@ public class ForwardIndexType extends AbstractIndexType<ForwardIndexConfig, Forw
}
private ForwardIndexConfig createConfigFromFieldConfig(FieldConfig fieldConfig) {
- FieldConfig.CompressionCodec compressionCodec = fieldConfig.getCompressionCodec();
ForwardIndexConfig.Builder builder = new ForwardIndexConfig.Builder();
- if (compressionCodec != null) {
- builder.withCompressionType(ChunkCompressionType.valueOf(compressionCodec.name()));
- }
-
+ builder.withCompressionCodec(fieldConfig.getCompressionCodec());
Map<String, String> properties = fieldConfig.getProperties();
if (properties != null) {
builder.withLegacyProperties(properties);
}
-
return builder.build();
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
index 5b5ad75de6..ac32a0a354 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
@@ -25,7 +25,6 @@ import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -47,6 +46,7 @@ import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.DictIdCompressionType;
import org.apache.pinot.segment.spi.creator.IndexCreationContext;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
@@ -98,11 +98,7 @@ public class ForwardIndexHandler extends BaseIndexHandler {
private final Schema _schema;
protected enum Operation {
- DISABLE_FORWARD_INDEX,
- ENABLE_FORWARD_INDEX,
- DISABLE_DICTIONARY,
- ENABLE_DICTIONARY,
- CHANGE_RAW_INDEX_COMPRESSION_TYPE,
+ DISABLE_FORWARD_INDEX, ENABLE_FORWARD_INDEX, DISABLE_DICTIONARY, ENABLE_DICTIONARY, CHANGE_INDEX_COMPRESSION_TYPE
}
@VisibleForTesting
@@ -147,8 +143,8 @@ public class ForwardIndexHandler extends BaseIndexHandler {
ColumnMetadata columnMetadata = createForwardIndexIfNeeded(segmentWriter, column, false);
if (columnMetadata.hasDictionary()) {
if (!segmentWriter.hasIndexFor(column, StandardIndexes.dictionary())) {
- throw new IllegalStateException(String.format("Dictionary should still exist after rebuilding "
- + "forward index for dictionary column: %s", column));
+ throw new IllegalStateException(String.format(
+ "Dictionary should still exist after rebuilding forward index for dictionary column: %s", column));
}
} else {
if (segmentWriter.hasIndexFor(column, StandardIndexes.dictionary())) {
@@ -159,8 +155,9 @@ public class ForwardIndexHandler extends BaseIndexHandler {
}
break;
case DISABLE_DICTIONARY:
- Set<String> newForwardIndexDisabledColumns = FieldIndexConfigsUtil.columnsWithIndexDisabled(
- _fieldIndexConfigs.keySet(), StandardIndexes.forward(), _fieldIndexConfigs);
+ Set<String> newForwardIndexDisabledColumns =
+ FieldIndexConfigsUtil.columnsWithIndexDisabled(_fieldIndexConfigs.keySet(), StandardIndexes.forward(),
+ _fieldIndexConfigs);
if (newForwardIndexDisabledColumns.contains(column)) {
removeDictionaryFromForwardIndexDisabledColumn(column, segmentWriter);
if (segmentWriter.hasIndexFor(column, StandardIndexes.dictionary())) {
@@ -177,8 +174,8 @@ public class ForwardIndexHandler extends BaseIndexHandler {
throw new IllegalStateException(String.format("Forward index was not created for column: %s", column));
}
break;
- case CHANGE_RAW_INDEX_COMPRESSION_TYPE:
- rewriteRawForwardIndexForCompressionChange(column, segmentWriter);
+ case CHANGE_INDEX_COMPRESSION_TYPE:
+ rewriteForwardIndexForCompressionChange(column, segmentWriter);
break;
default:
throw new IllegalStateException("Unsupported operation for column " + column);
@@ -197,26 +194,9 @@ public class ForwardIndexHandler extends BaseIndexHandler {
return columnOperationsMap;
}
- // From existing column config.
Set<String> existingAllColumns = _segmentDirectory.getSegmentMetadata().getAllColumns();
- Set<String> existingDictColumns =
- segmentReader.toSegmentDirectory().getColumnsWithIndex(StandardIndexes.dictionary());
- Set<String> existingNoDictColumns = new HashSet<>();
- for (String column : existingAllColumns) {
- if (!existingDictColumns.contains(column)) {
- existingNoDictColumns.add(column);
- }
- }
-
- // Get list of columns with forward index and those without forward index
- Set<String> existingForwardIndexColumns =
- segmentReader.toSegmentDirectory().getColumnsWithIndex(StandardIndexes.forward());
- Set<String> existingForwardIndexDisabledColumns = new HashSet<>();
- for (String column : existingAllColumns) {
- if (!existingForwardIndexColumns.contains(column)) {
- existingForwardIndexDisabledColumns.add(column);
- }
- }
+ Set<String> existingDictColumns = _segmentDirectory.getColumnsWithIndex(StandardIndexes.dictionary());
+ Set<String> existingForwardIndexColumns = _segmentDirectory.getColumnsWithIndex(StandardIndexes.forward());
for (String column : existingAllColumns) {
if (_schema != null && !_schema.hasColumn(column)) {
@@ -224,12 +204,14 @@ public class ForwardIndexHandler extends BaseIndexHandler {
LOGGER.info("Column {} is not in schema, skipping updating forward index", column);
continue;
}
+ boolean existingHasDict = existingDictColumns.contains(column);
+ boolean existingHasFwd = existingForwardIndexColumns.contains(column);
FieldIndexConfigs newConf = _fieldIndexConfigs.get(column);
boolean newIsFwd = newConf.getConfig(StandardIndexes.forward()).isEnabled();
boolean newIsDict = newConf.getConfig(StandardIndexes.dictionary()).isEnabled();
boolean newIsRange = newConf.getConfig(StandardIndexes.range()).isEnabled();
- if (existingForwardIndexColumns.contains(column) && !newIsFwd) {
+ if (existingHasFwd && !newIsFwd) {
// Existing column has a forward index. New column config disables the forward index
ColumnMetadata columnMetadata = _segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
@@ -240,13 +222,13 @@ public class ForwardIndexHandler extends BaseIndexHandler {
continue;
}
- if (existingDictColumns.contains(column)) {
+ if (existingHasDict) {
if (!newIsDict) {
// Dictionary was also disabled. Just disable the dictionary and remove it along with the forward index
// If range index exists, don't try to regenerate it on toggling the dictionary, throw an error instead
- Preconditions.checkState(!newIsRange,
- String.format("Must disable range (enabled) index to disable the dictionary and forward index for "
- + "column: %s or refresh / back-fill the forward index", column));
+ Preconditions.checkState(!newIsRange, String.format(
+ "Must disable range (enabled) index to disable the dictionary and forward index for column: %s or "
+ + "refresh / back-fill the forward index", column));
columnOperationsMap.put(column,
Arrays.asList(Operation.DISABLE_FORWARD_INDEX, Operation.DISABLE_DICTIONARY));
} else {
@@ -263,7 +245,7 @@ public class ForwardIndexHandler extends BaseIndexHandler {
Arrays.asList(Operation.DISABLE_FORWARD_INDEX, Operation.ENABLE_DICTIONARY));
}
}
- } else if (existingForwardIndexDisabledColumns.contains(column) && newIsFwd) {
+ } else if (!existingHasFwd && newIsFwd) {
// Existing column does not have a forward index. New column config enables the forward index
ColumnMetadata columnMetadata = _segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
@@ -277,37 +259,37 @@ public class ForwardIndexHandler extends BaseIndexHandler {
// Get list of columns with inverted index
Set<String> existingInvertedIndexColumns =
segmentReader.toSegmentDirectory().getColumnsWithIndex(StandardIndexes.inverted());
- if (!existingDictColumns.contains(column) || !existingInvertedIndexColumns.contains(column)) {
+ if (!existingHasDict || !existingInvertedIndexColumns.contains(column)) {
// If either dictionary or inverted index is missing on the column there is no way to re-generate the forward
// index. Treat this as a no-op and log a warning.
LOGGER.warn("Trying to enable the forward index for a column {} missing either the dictionary ({}) and / or "
+ "the inverted index ({}) is not possible. Either a refresh or back-fill is required to get the "
- + "forward index, ignoring", column, existingDictColumns.contains(column) ? "enabled" : "disabled",
+ + "forward index, ignoring", column, existingHasDict ? "enabled" : "disabled",
existingInvertedIndexColumns.contains(column) ? "enabled" : "disabled");
continue;
}
columnOperationsMap.put(column, Collections.singletonList(Operation.ENABLE_FORWARD_INDEX));
- } else if (existingForwardIndexDisabledColumns.contains(column) && !newIsFwd) {
+ } else if (!existingHasFwd) {
// Forward index is disabled for the existing column and should remain disabled based on the latest config
// Need some checks to see whether the dictionary is being enabled or disabled here and take appropriate actions
// If the dictionary is not enabled on the existing column it must be on the new noDictionary column list.
// Cannot enable the dictionary for a column with forward index disabled.
- Preconditions.checkState(existingDictColumns.contains(column) || !newIsDict,
+ Preconditions.checkState(existingHasDict || !newIsDict,
String.format("Cannot regenerate the dictionary for column %s with forward index disabled. Please "
+ "refresh or back-fill the data to add back the forward index", column));
- if (existingDictColumns.contains(column) && !newIsDict) {
+ if (existingHasDict && !newIsDict) {
// Dictionary is currently enabled on this column but is supposed to be disabled. Remove the dictionary
// and update the segment metadata If the range index exists then throw an error since we are not
// regenerating the range index on toggling the dictionary
- Preconditions.checkState(!newIsRange,
- String.format("Must disable range (enabled) index to disable the dictionary for a forwardIndexDisabled "
- + "column: %s or refresh / back-fill the forward index", column));
+ Preconditions.checkState(!newIsRange, String.format(
+ "Must disable range (enabled) index to disable the dictionary for a forwardIndexDisabled column: %s or "
+ + "refresh / back-fill the forward index", column));
columnOperationsMap.put(column, Collections.singletonList(Operation.DISABLE_DICTIONARY));
}
- } else if (existingNoDictColumns.contains(column) && newIsDict) {
+ } else if (!existingHasDict && newIsDict) {
// Existing column is RAW. New column is dictionary enabled.
if (_schema == null || _tableConfig == null) {
// This can only happen in tests.
@@ -315,25 +297,28 @@ public class ForwardIndexHandler extends BaseIndexHandler {
continue;
}
ColumnMetadata existingColumnMetadata = _segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
- if (DictionaryIndexType.ignoreDictionaryOverride(
- _tableConfig.getIndexingConfig().isOptimizeDictionary(),
+ if (DictionaryIndexType.ignoreDictionaryOverride(_tableConfig.getIndexingConfig().isOptimizeDictionary(),
_tableConfig.getIndexingConfig().isOptimizeDictionaryForMetrics(),
- _tableConfig.getIndexingConfig().getNoDictionarySizeRatioThreshold(),
- existingColumnMetadata.getFieldSpec(), _fieldIndexConfigs.get(column),
- existingColumnMetadata.getCardinality(),
+ _tableConfig.getIndexingConfig().getNoDictionarySizeRatioThreshold(), existingColumnMetadata.getFieldSpec(),
+ _fieldIndexConfigs.get(column), existingColumnMetadata.getCardinality(),
existingColumnMetadata.getTotalNumberOfEntries())) {
columnOperationsMap.put(column, Collections.singletonList(Operation.ENABLE_DICTIONARY));
}
- } else if (existingDictColumns.contains(column) && !newIsDict) {
+ } else if (existingHasDict && !newIsDict) {
// Existing column has dictionary. New config for the column is RAW.
if (shouldDisableDictionary(column, _segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column))) {
columnOperationsMap.put(column, Collections.singletonList(Operation.DISABLE_DICTIONARY));
}
- } else if (existingNoDictColumns.contains(column) && !newIsDict) {
+ } else if (!existingHasDict) {
// Both existing and new column is RAW forward index encoded. Check if compression needs to be changed.
// TODO: Also check if raw index version needs to be changed
- if (shouldChangeCompressionType(column, segmentReader)) {
- columnOperationsMap.put(column, Collections.singletonList(Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE));
+ if (shouldChangeRawCompressionType(column, segmentReader)) {
+ columnOperationsMap.put(column, Collections.singletonList(Operation.CHANGE_INDEX_COMPRESSION_TYPE));
+ }
+ } else {
+ // Both existing and new column is dictionary encoded. Check if compression needs to be changed.
+ if (shouldChangeDictIdCompressionType(column, segmentReader)) {
+ columnOperationsMap.put(column, Collections.singletonList(Operation.CHANGE_INDEX_COMPRESSION_TYPE));
}
}
}
@@ -346,8 +331,9 @@ public class ForwardIndexHandler extends BaseIndexHandler {
// Remove the dictionary and update the metadata to indicate that the dictionary is no longer present
segmentWriter.removeIndex(column, StandardIndexes.dictionary());
String segmentName = _segmentDirectory.getSegmentMetadata().getName();
- LOGGER.info("Removed dictionary for noForwardIndex column. Updating metadata properties for segment={} and "
- + "column={}", segmentName, column);
+ LOGGER.info(
+ "Removed dictionary for noForwardIndex column. Updating metadata properties for segment={} and column={}",
+ segmentName, column);
Map<String, String> metadataProperties = new HashMap<>();
metadataProperties.put(getKeyFor(column, HAS_DICTIONARY), String.valueOf(false));
metadataProperties.put(getKeyFor(column, DICTIONARY_ELEMENT_SIZE), String.valueOf(0));
@@ -381,37 +367,63 @@ public class ForwardIndexHandler extends BaseIndexHandler {
return true;
}
- private boolean shouldChangeCompressionType(String column, SegmentDirectory.Reader segmentReader)
+ private boolean shouldChangeRawCompressionType(String column, SegmentDirectory.Reader segmentReader)
throws Exception {
- ColumnMetadata existingColMetadata = _segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
-
// The compression type for an existing segment can only be determined by reading the forward index header.
+ ColumnMetadata existingColMetadata = _segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
+ ChunkCompressionType existingCompressionType;
try (ForwardIndexReader<?> fwdIndexReader = ForwardIndexType.read(segmentReader, existingColMetadata)) {
- ChunkCompressionType existingCompressionType = fwdIndexReader.getCompressionType();
+ existingCompressionType = fwdIndexReader.getCompressionType();
Preconditions.checkState(existingCompressionType != null,
"Existing compressionType cannot be null for raw forward index column=" + column);
+ }
- // Get the new compression type.
- ChunkCompressionType newCompressionType = _fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward())
- .getChunkCompressionType();
+ // Get the new compression type.
+ ChunkCompressionType newCompressionType =
+ _fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward()).getChunkCompressionType();
- // Note that default compression type (PASS_THROUGH for metric and LZ4 for dimension) is not considered if the
- // compressionType is not explicitly provided in tableConfig. This is to avoid incorrectly rewriting all the
- // forward indexes during segmentReload when the default compressionType changes.
- return newCompressionType != null && existingCompressionType != newCompressionType;
+ // Note that default compression type (PASS_THROUGH for metric and LZ4 for dimension) is not considered if the
+ // compressionType is not explicitly provided in tableConfig. This is to avoid incorrectly rewriting all the
+ // forward indexes during segmentReload when the default compressionType changes.
+ return newCompressionType != null && existingCompressionType != newCompressionType;
+ }
+
+ private boolean shouldChangeDictIdCompressionType(String column, SegmentDirectory.Reader segmentReader)
+ throws Exception {
+ // The compression type for an existing segment can only be determined by reading the forward index header.
+ ColumnMetadata existingColMetadata = _segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
+ DictIdCompressionType existingCompressionType;
+ try (ForwardIndexReader<?> fwdIndexReader = ForwardIndexType.read(segmentReader, existingColMetadata)) {
+ existingCompressionType = fwdIndexReader.getDictIdCompressionType();
}
+
+ // Get the new compression type.
+ DictIdCompressionType newCompressionType =
+ _fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward()).getDictIdCompressionType();
+ if (newCompressionType != null && !newCompressionType.isApplicable(existingColMetadata.isSingleValue())) {
+ newCompressionType = null;
+ }
+
+ return existingCompressionType != newCompressionType;
}
- private void rewriteRawForwardIndexForCompressionChange(String column, SegmentDirectory.Writer segmentWriter)
+ private void rewriteForwardIndexForCompressionChange(String column, SegmentDirectory.Writer segmentWriter)
throws Exception {
ColumnMetadata existingColMetadata = _segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
boolean isSingleValue = existingColMetadata.isSingleValue();
+ boolean hasDictionary = existingColMetadata.hasDictionary();
File indexDir = _segmentDirectory.getSegmentMetadata().getIndexDir();
String segmentName = _segmentDirectory.getSegmentMetadata().getName();
File inProgress = new File(indexDir, column + ".fwd.inprogress");
- String fileExtension = isSingleValue ? V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION
- : V1Constants.Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION;
+ String fileExtension;
+ if (isSingleValue) {
+ fileExtension = hasDictionary ? V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION
+ : V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION;
+ } else {
+ fileExtension = hasDictionary ? V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION
+ : V1Constants.Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION;
+ }
File fwdIndexFile = new File(indexDir, column + fileExtension);
if (!inProgress.exists()) {
@@ -425,20 +437,7 @@ public class ForwardIndexHandler extends BaseIndexHandler {
}
LOGGER.info("Creating new forward index for segment={} and column={}", segmentName, column);
-
- // At this point, compressionConfigs is guaranteed to contain the column. If there's no entry in the map, we
- // wouldn't have computed the CHANGE_RAW_COMPRESSION_TYPE operation for this column as compressionType changes
- // are processed only if a valid compressionType is specified in fieldConfig.
- ChunkCompressionType newCompressionType = _fieldIndexConfigs.get(column)
- .getConfig(StandardIndexes.forward()).getChunkCompressionType();
-
- if (isSingleValue) {
- rewriteRawSVForwardIndexForCompressionChange(column, existingColMetadata, indexDir, segmentWriter,
- newCompressionType);
- } else {
- rewriteRawMVForwardIndexForCompressionChange(column, existingColMetadata, indexDir, segmentWriter,
- newCompressionType);
- }
+ rewriteForwardIndexForCompressionChange(column, existingColMetadata, indexDir, segmentWriter);
// We used the existing forward index to generate a new forward index. The existing forward index will be in V3
// format and the new forward index will be in V1 format. Remove the existing forward index as it is not needed
@@ -454,61 +453,32 @@ public class ForwardIndexHandler extends BaseIndexHandler {
LOGGER.info("Created forward index for segment: {}, column: {}", segmentName, column);
}
- private void rewriteRawMVForwardIndexForCompressionChange(String column, ColumnMetadata existingColMetadata,
- File indexDir, SegmentDirectory.Writer segmentWriter, ChunkCompressionType newCompressionType)
+ private void rewriteForwardIndexForCompressionChange(String column, ColumnMetadata columnMetadata, File indexDir,
+ SegmentDirectory.Writer segmentWriter)
throws Exception {
- try (ForwardIndexReader<?> reader = ForwardIndexType.read(segmentWriter, existingColMetadata)) {
- // For VarByte MV columns like String and Bytes, the storage representation of each row contains the following
- // components:
- // 1. bytes required to store the actual elements of the MV row (A)
- // 2. bytes required to store the number of elements in the MV row (B)
- // 3. bytes required to store the length of each MV element (C)
- //
- // lengthOfLongestEntry = A + B + C
- // maxRowLengthInBytes = A
+ try (ForwardIndexReader<?> reader = ForwardIndexType.read(segmentWriter, columnMetadata)) {
int lengthOfLongestEntry = reader.getLengthOfLongestEntry();
- int maxNumberOfMVEntries = existingColMetadata.getMaxNumberOfMultiValues();
- int maxRowLengthInBytes =
- MultiValueVarByteRawIndexCreator.getMaxRowDataLengthInBytes(lengthOfLongestEntry, maxNumberOfMVEntries);
-
- IndexCreationContext context = IndexCreationContext.builder()
- .withMaxRowLengthInBytes(maxRowLengthInBytes)
- .withIndexDir(indexDir)
- .withColumnMetadata(existingColMetadata)
- .withLengthOfLongestEntry(lengthOfLongestEntry)
- .build();
-
- ForwardIndexConfig config = _fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward());
-
- try (ForwardIndexCreator creator = StandardIndexes.forward().createIndexCreator(context, config)) {
- if (!reader.getStoredType().equals(creator.getValueType())) {
- // Creator stored type should match reader stored type for raw columns. We do not support changing datatypes.
- String failureMsg =
- "Unsupported operation to change datatype for column=" + column + " from " + reader.getStoredType()
- .toString() + " to " + creator.getValueType().toString();
- throw new UnsupportedOperationException(failureMsg);
- }
-
- int numDocs = existingColMetadata.getTotalDocs();
- forwardIndexRewriteHelper(column, existingColMetadata, reader, creator, numDocs, null, null);
+ IndexCreationContext context;
+ if (columnMetadata.isSingleValue()) {
+ context = IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata)
+ .withLengthOfLongestEntry(lengthOfLongestEntry).build();
+ } else {
+ // For VarByte MV columns like String and Bytes, the storage representation of each row contains the following
+ // components:
+ // 1. bytes required to store the actual elements of the MV row (A)
+ // 2. bytes required to store the number of elements in the MV row (B)
+ // 3. bytes required to store the length of each MV element (C)
+ //
+ // lengthOfLongestEntry = A + B + C
+ // maxRowLengthInBytes = A
+ int maxNumValuesPerEntry = columnMetadata.getMaxNumberOfMultiValues();
+ int maxRowLengthInBytes =
+ MultiValueVarByteRawIndexCreator.getMaxRowDataLengthInBytes(lengthOfLongestEntry, maxNumValuesPerEntry);
+ context = IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata)
+ .withLengthOfLongestEntry(lengthOfLongestEntry).withMaxRowLengthInBytes(maxRowLengthInBytes).build();
}
- }
- }
-
- private void rewriteRawSVForwardIndexForCompressionChange(String column, ColumnMetadata existingColMetadata,
- File indexDir, SegmentDirectory.Writer segmentWriter, ChunkCompressionType newCompressionType)
- throws Exception {
- try (ForwardIndexReader<?> reader = ForwardIndexType.read(segmentWriter, existingColMetadata)) {
- int lengthOfLongestEntry = reader.getLengthOfLongestEntry();
-
- IndexCreationContext context = IndexCreationContext.builder()
- .withIndexDir(indexDir)
- .withColumnMetadata(existingColMetadata)
- .withLengthOfLongestEntry(lengthOfLongestEntry)
- .build();
ForwardIndexConfig config = _fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward());
-
try (ForwardIndexCreator creator = StandardIndexes.forward().createIndexCreator(context, config)) {
if (!reader.getStoredType().equals(creator.getValueType())) {
// Creator stored type should match reader stored type for raw columns. We do not support changing datatypes.
@@ -518,8 +488,8 @@ public class ForwardIndexHandler extends BaseIndexHandler {
throw new UnsupportedOperationException(failureMsg);
}
- int numDocs = existingColMetadata.getTotalDocs();
- forwardIndexRewriteHelper(column, existingColMetadata, reader, creator, numDocs, null, null);
+ int numDocs = columnMetadata.getTotalDocs();
+ forwardIndexRewriteHelper(column, columnMetadata, reader, creator, numDocs, null, null);
}
}
}
@@ -528,16 +498,39 @@ public class ForwardIndexHandler extends BaseIndexHandler {
ForwardIndexReader<?> reader, ForwardIndexCreator creator, int numDocs,
@Nullable SegmentDictionaryCreator dictionaryCreator, @Nullable Dictionary dictionaryReader) {
if (dictionaryReader == null && dictionaryCreator == null) {
- // Read raw forward index and write raw forward index.
- forwardIndexReadRawWriteRawHelper(column, existingColumnMetadata, reader, creator, numDocs);
- } else if (dictionaryReader != null && dictionaryCreator == null) {
+ if (reader.isDictionaryEncoded()) {
+ Preconditions.checkState(creator.isDictionaryEncoded(), "Cannot change dictionary based forward index to raw "
+ + "forward index without providing dictionary reader for column: %s", column);
+ // Read dictionary based forward index and write dictionary based forward index.
+ forwardIndexReadDictWriteDictHelper(reader, creator, numDocs);
+ } else {
+ Preconditions.checkState(!creator.isDictionaryEncoded(), "Cannot change raw forward index to dictionary based "
+ + "forward index without providing dictionary creator for column: %s", column);
+ // Read raw forward index and write raw forward index.
+ forwardIndexReadRawWriteRawHelper(column, existingColumnMetadata, reader, creator, numDocs);
+ }
+ } else if (dictionaryCreator == null) {
// Read dictionary based forward index and write raw forward index.
forwardIndexReadDictWriteRawHelper(column, existingColumnMetadata, reader, creator, numDocs, dictionaryReader);
} else if (dictionaryReader == null) {
// Read raw forward index and write dictionary based forward index.
forwardIndexReadRawWriteDictHelper(column, existingColumnMetadata, reader, creator, numDocs, dictionaryCreator);
} else {
- Preconditions.checkState(false, "Invalid dict-based read/write for column=" + column);
+ throw new IllegalStateException("One of dictionary reader or creator should be null for column: %s" + column);
+ }
+ }
+
+ private <C extends ForwardIndexReaderContext> void forwardIndexReadDictWriteDictHelper(ForwardIndexReader<C> reader,
+ ForwardIndexCreator creator, int numDocs) {
+ C readerContext = reader.createContext();
+ if (reader.isSingleValue()) {
+ for (int i = 0; i < numDocs; i++) {
+ creator.putDictId(reader.getDictId(i, readerContext));
+ }
+ } else {
+ for (int i = 0; i < numDocs; i++) {
+ creator.putDictIdMV(reader.getDictIdMV(i, readerContext));
+ }
}
}
@@ -856,11 +849,10 @@ public class ForwardIndexHandler extends BaseIndexHandler {
DictionaryIndexConfig dictConf = _fieldIndexConfigs.get(column).getConfig(StandardIndexes.dictionary());
- boolean useVarLength = dictConf.getUseVarLengthDictionary()
- || DictionaryIndexType.shouldUseVarLengthDictionary(reader.getStoredType(), statsCollector);
- SegmentDictionaryCreator dictionaryCreator =
- new SegmentDictionaryCreator(existingColMetadata.getFieldSpec(),
- _segmentDirectory.getSegmentMetadata().getIndexDir(), useVarLength);
+ boolean useVarLength = dictConf.getUseVarLengthDictionary() || DictionaryIndexType.shouldUseVarLengthDictionary(
+ reader.getStoredType(), statsCollector);
+ SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(existingColMetadata.getFieldSpec(),
+ _segmentDirectory.getSegmentMetadata().getIndexDir(), useVarLength);
dictionaryCreator.build(statsCollector.getUniqueValuesSet());
return dictionaryCreator;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
index bdd54224e0..f7f185cc65 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
@@ -35,7 +35,6 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.segment.local.segment.index.column.PhysicalColumnIndexContainer;
import org.apache.pinot.segment.local.segment.index.loader.columnminmaxvalue.ColumnMinMaxValueGeneratorMode;
-import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer;
import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
@@ -49,6 +48,7 @@ import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.BloomFilterConfig;
import org.apache.pinot.spi.config.table.FSTType;
import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec;
import org.apache.pinot.spi.config.table.IndexConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.JsonIndexConfig;
@@ -93,7 +93,7 @@ public class IndexLoadingConfig {
private boolean _enableDynamicStarTreeCreation;
private List<StarTreeIndexConfig> _starTreeIndexConfigs;
private boolean _enableDefaultStarTree;
- private Map<String, ChunkCompressionType> _compressionConfigs = new HashMap<>();
+ private Map<String, CompressionCodec> _compressionConfigs = new HashMap<>();
private Map<String, FieldIndexConfigs> _indexConfigsByColName = new HashMap<>();
private SegmentVersion _segmentVersion;
@@ -299,8 +299,8 @@ public class IndexLoadingConfig {
+ "indexLoadingConfig for indexType: {}", _schema == null, _tableConfig == null, indexType);
deserializer = IndexConfigDeserializer.fromMap(table -> fromIndexLoadingConfig);
} else if (_segmentTier == null) {
- deserializer = IndexConfigDeserializer.fromMap(table -> fromIndexLoadingConfig)
- .withFallbackAlternative(stdDeserializer);
+ deserializer =
+ IndexConfigDeserializer.fromMap(table -> fromIndexLoadingConfig).withFallbackAlternative(stdDeserializer);
} else {
// No need to fall back to fromIndexLoadingConfig which contains index configs for default tier, when looking
// for tier specific index configs.
@@ -352,8 +352,7 @@ public class IndexLoadingConfig {
for (FieldConfig fieldConfig : fieldConfigList) {
String column = fieldConfig.getName();
if (fieldConfig.getCompressionCodec() != null) {
- ChunkCompressionType compressionType = ChunkCompressionType.valueOf(fieldConfig.getCompressionCodec().name());
- _compressionConfigs.put(column, compressionType);
+ _compressionConfigs.put(column, fieldConfig.getCompressionCodec());
}
}
}
@@ -613,7 +612,7 @@ public class IndexLoadingConfig {
* Used by segmentPreProcessorTest to set compression configs.
*/
@VisibleForTesting
- public void setCompressionConfigs(Map<String, ChunkCompressionType> compressionConfigs) {
+ public void setCompressionConfigs(Map<String, CompressionCodec> compressionConfigs) {
_compressionConfigs = new HashMap<>(compressionConfigs);
_dirty = true;
}
@@ -750,7 +749,7 @@ public class IndexLoadingConfig {
*
* @return a map containing column name as key and compressionType as value.
*/
- public Map<String, ChunkCompressionType> getCompressionConfigs() {
+ public Map<String, CompressionCodec> getCompressionConfigs() {
return unmodifiable(_compressionConfigs);
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index acfd4827c6..09643966c2 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -36,6 +36,7 @@ import org.apache.pinot.segment.local.function.FunctionEvaluator;
import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentColumnarIndexCreator;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueEntryDictForwardIndexCreator;
import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator;
import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSortedForwardIndexCreator;
import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator;
@@ -48,6 +49,7 @@ import org.apache.pinot.segment.local.segment.creator.impl.stats.IntColumnPreInd
import org.apache.pinot.segment.local.segment.creator.impl.stats.LongColumnPreIndexStatsCollector;
import org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector;
import org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
+import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexPlugin;
import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexType;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
@@ -55,8 +57,11 @@ import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.DictIdCompressionType;
import org.apache.pinot.segment.spi.creator.ColumnIndexCreationInfo;
import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
+import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator;
import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
@@ -780,8 +785,18 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
}
}
} else {
- try (ForwardIndexCreator forwardIndexCreator = new MultiValueUnsortedForwardIndexCreator(_indexDir, column,
- cardinality, numDocs, indexCreationInfo.getTotalNumberOfEntries())) {
+ DictIdCompressionType dictIdCompressionType = null;
+ FieldIndexConfigs fieldIndexConfig = _indexLoadingConfig.getFieldIndexConfig(column);
+ if (fieldIndexConfig != null) {
+ ForwardIndexConfig forwardIndexConfig = fieldIndexConfig.getConfig(new ForwardIndexPlugin().getIndexType());
+ if (forwardIndexConfig != null) {
+ dictIdCompressionType = forwardIndexConfig.getDictIdCompressionType();
+ }
+ }
+ try (ForwardIndexCreator forwardIndexCreator = dictIdCompressionType == DictIdCompressionType.MV_ENTRY_DICT
+ ? new MultiValueEntryDictForwardIndexCreator(_indexDir, column, cardinality, numDocs)
+ : new MultiValueUnsortedForwardIndexCreator(_indexDir, column, cardinality, numDocs,
+ indexCreationInfo.getTotalNumberOfEntries())) {
for (int i = 0; i < numDocs; i++) {
forwardIndexCreator.putDictIdMV(dictionaryCreator.indexOfMV(outputValues[i]));
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitMVEntryDictForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitMVEntryDictForwardIndexReader.java
new file mode 100644
index 0000000000..05c4000903
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitMVEntryDictForwardIndexReader.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.readers.forward;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import org.apache.pinot.segment.local.io.util.FixedBitIntReaderWriter;
+import org.apache.pinot.segment.local.io.util.PinotDataBitSet;
+import org.apache.pinot.segment.local.io.writer.impl.FixedBitMVEntryDictForwardIndexWriter;
+import org.apache.pinot.segment.spi.compression.DictIdCompressionType;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * Bit-compressed dictionary-encoded forward index reader for multi-value columns, where a second level dictionary
+ * encoding for multi-value entries (instead of individual values within the entry) are maintained within the forward
+ * index.
+ * See {@link FixedBitMVEntryDictForwardIndexWriter} for index layout.
+ */
+public final class FixedBitMVEntryDictForwardIndexReader implements ForwardIndexReader<ForwardIndexReaderContext> {
+ public static final int MAGIC_MARKER = FixedBitMVEntryDictForwardIndexWriter.MAGIC_MARKER;
+ public static final short VERSION = FixedBitMVEntryDictForwardIndexWriter.VERSION;
+ private static final int HEADER_SIZE = FixedBitMVEntryDictForwardIndexWriter.HEADER_SIZE;
+
+ private final FixedBitIntReaderWriter _idReader;
+ private final int _offsetBufferOffset;
+ private final FixedBitIntReaderWriter _offsetReader;
+ private final int _valueBufferOffset;
+ private final FixedBitIntReaderWriter _valueReader;
+
+ public FixedBitMVEntryDictForwardIndexReader(PinotDataBuffer dataBuffer, int numDocs, int numBitsPerValue) {
+ int magicMarker = dataBuffer.getInt(0);
+ Preconditions.checkState(magicMarker == MAGIC_MARKER, "Invalid magic marker: %s (expected: %s)", magicMarker,
+ MAGIC_MARKER);
+ short version = dataBuffer.getShort(4);
+ Preconditions.checkState(version == VERSION, "Invalid version: %s (expected: %s)", version, VERSION);
+ int numBitsPerValueInHeader = dataBuffer.getByte(6);
+ Preconditions.checkState(numBitsPerValueInHeader == numBitsPerValue, "Invalid numBitsPerValue: %s (expected: %s)",
+ numBitsPerValueInHeader, numBitsPerValue);
+ int numBitsPerId = dataBuffer.getByte(7);
+ int numUniqueEntries = dataBuffer.getInt(8);
+ int numTotalValues = dataBuffer.getInt(12);
+ _offsetBufferOffset = dataBuffer.getInt(16);
+ _valueBufferOffset = dataBuffer.getInt(20);
+ _idReader = new FixedBitIntReaderWriter(dataBuffer.view(HEADER_SIZE, _offsetBufferOffset), numDocs, numBitsPerId);
+ _offsetReader =
+ new FixedBitIntReaderWriter(dataBuffer.view(_offsetBufferOffset, _valueBufferOffset), numUniqueEntries + 1,
+ PinotDataBitSet.getNumBitsPerValue(numTotalValues));
+ _valueReader = new FixedBitIntReaderWriter(dataBuffer.view(_valueBufferOffset, dataBuffer.size()), numTotalValues,
+ numBitsPerValue);
+ }
+
+ @Override
+ public boolean isDictionaryEncoded() {
+ return true;
+ }
+
+ @Override
+ public boolean isSingleValue() {
+ return false;
+ }
+
+ @Override
+ public DataType getStoredType() {
+ return DataType.INT;
+ }
+
+ @Override
+ public DictIdCompressionType getDictIdCompressionType() {
+ return DictIdCompressionType.MV_ENTRY_DICT;
+ }
+
+ @Override
+ public int getDictIdMV(int docId, int[] dictIdBuffer, ForwardIndexReaderContext context) {
+ int id = _idReader.readInt(docId);
+ int startIndex = _offsetReader.readInt(id);
+ int numValues = _offsetReader.readInt(id + 1) - startIndex;
+ _valueReader.readInt(startIndex, numValues, dictIdBuffer);
+ return numValues;
+ }
+
+ @Override
+ public int[] getDictIdMV(int docId, ForwardIndexReaderContext context) {
+ int id = _idReader.readInt(docId);
+ int startIndex = _offsetReader.readInt(id);
+ int numValues = _offsetReader.readInt(id + 1) - startIndex;
+ int[] dictIdBuffer = new int[numValues];
+ _valueReader.readInt(startIndex, numValues, dictIdBuffer);
+ return dictIdBuffer;
+ }
+
+ @Override
+ public int getNumValuesMV(int docId, ForwardIndexReaderContext context) {
+ int id = _idReader.readInt(docId);
+ return _offsetReader.readInt(id + 1) - _offsetReader.readInt(id);
+ }
+
+ @Override
+ public boolean isBufferByteRangeInfoSupported() {
+ return true;
+ }
+
+ @Override
+ public void recordDocIdByteRanges(int docId, ForwardIndexReaderContext context, List<ByteRange> ranges) {
+ int id = _idReader.readInt(docId);
+ int idReaderStartByteOffset = _idReader.getStartByteOffset(docId);
+ int idReaderEndByteOffset = _idReader.getEndByteOffset(docId);
+ ranges.add(new ByteRange(HEADER_SIZE + idReaderStartByteOffset, idReaderEndByteOffset - idReaderStartByteOffset));
+
+ int startIndex = _offsetReader.readInt(id);
+ int numValues = _offsetReader.readInt(id + 1) - startIndex;
+ int offsetReaderStartByteOffset = _offsetReader.getStartByteOffset(id);
+ int offsetReaderEndByteOffset = _offsetReader.getEndByteOffset(id + 1);
+ ranges.add(new ByteRange(_offsetBufferOffset + offsetReaderStartByteOffset,
+ offsetReaderEndByteOffset - offsetReaderStartByteOffset));
+
+ int valueReaderStartByteOffset = _valueReader.getStartByteOffset(startIndex);
+ int valueReaderEndByteOffset = _valueReader.getEndByteOffset(startIndex + numValues - 1);
+ ranges.add(new ByteRange(_valueBufferOffset + valueReaderStartByteOffset,
+ valueReaderEndByteOffset - valueReaderStartByteOffset));
+ }
+
+ @Override
+ public boolean isFixedOffsetMappingType() {
+ return false;
+ }
+
+ @Override
+ public void close() {
+ // NOTE: DO NOT close the PinotDataBuffer here because it is tracked by the caller and might be reused later. The
+ // caller is responsible of closing the PinotDataBuffer.
+ _idReader.close();
+ _offsetReader.close();
+ _valueReader.close();
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index ebea84c455..8900b6f36f 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -51,6 +51,8 @@ import org.apache.pinot.segment.spi.index.IndexService;
import org.apache.pinot.segment.spi.index.IndexType;
import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec;
+import org.apache.pinot.spi.config.table.FieldConfig.EncodingType;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.QuotaConfig;
import org.apache.pinot.spi.config.table.RoutingConfig;
@@ -943,8 +945,8 @@ public final class TableConfigUtils {
* Also ensures proper dependency between index types (eg: Inverted Index columns
* cannot be present in no-dictionary columns).
*/
- private static void validateIndexingConfig(@Nullable IndexingConfig indexingConfig, @Nullable Schema schema) {
- if (indexingConfig == null || schema == null) {
+ private static void validateIndexingConfig(IndexingConfig indexingConfig, @Nullable Schema schema) {
+ if (schema == null) {
return;
}
ArrayListMultimap<String, String> columnNameToConfigMap = ArrayListMultimap.create();
@@ -1115,59 +1117,69 @@ public final class TableConfigUtils {
* Validates index compatibility for forward index disabled columns
*/
private static void validateFieldConfigList(@Nullable List<FieldConfig> fieldConfigList,
- @Nullable IndexingConfig indexingConfigs, @Nullable Schema schema) {
- if (fieldConfigList == null || schema == null) {
+ IndexingConfig indexingConfig, @Nullable Schema schema) {
+ if (fieldConfigList == null) {
return;
}
+ assert indexingConfig != null;
for (FieldConfig fieldConfig : fieldConfigList) {
String columnName = fieldConfig.getName();
- FieldSpec fieldConfigColSpec = schema.getFieldSpecFor(columnName);
- Preconditions.checkState(fieldConfigColSpec != null,
- "Column Name " + columnName + " defined in field config list must be a valid column defined in the schema");
-
- if (indexingConfigs != null) {
- List<String> noDictionaryColumns = indexingConfigs.getNoDictionaryColumns();
- switch (fieldConfig.getEncodingType()) {
- case DICTIONARY:
- if (noDictionaryColumns != null) {
- Preconditions.checkArgument(!noDictionaryColumns.contains(columnName),
- "FieldConfig encoding type is different from indexingConfig for column: " + columnName);
- }
- Preconditions.checkArgument(fieldConfig.getCompressionCodec() == null,
- "Set compression codec to null for dictionary encoding type");
- break;
- default:
- break;
- }
+ EncodingType encodingType = fieldConfig.getEncodingType();
+ Preconditions.checkArgument(encodingType != null, "Encoding type must be specified for column: %s", columnName);
+ CompressionCodec compressionCodec = fieldConfig.getCompressionCodec();
+ switch (encodingType) {
+ case RAW:
+ Preconditions.checkArgument(compressionCodec == null || compressionCodec.isApplicableToRawIndex(),
+ "Compression codec: %s is not applicable to raw index", compressionCodec);
+ break;
+ case DICTIONARY:
+ Preconditions.checkArgument(compressionCodec == null || compressionCodec.isApplicableToDictEncodedIndex(),
+ "Compression codec: %s is not applicable to dictionary encoded index", compressionCodec);
+ List<String> noDictionaryColumns = indexingConfig.getNoDictionaryColumns();
+ Preconditions.checkArgument(noDictionaryColumns == null || !noDictionaryColumns.contains(columnName),
+ "FieldConfig encoding type is different from indexingConfig for column: %s", columnName);
+ Map<String, String> noDictionaryConfig = indexingConfig.getNoDictionaryConfig();
+ Preconditions.checkArgument(noDictionaryConfig == null || !noDictionaryConfig.containsKey(columnName),
+ "FieldConfig encoding type is different from indexingConfig for column: %s", columnName);
+ break;
+ default:
+ break;
+ }
- // Validate the forward index disabled compatibility with other indexes if enabled for this column
- validateForwardIndexDisabledIndexCompatibility(columnName, fieldConfig, indexingConfigs, noDictionaryColumns,
- schema);
+ if (schema == null) {
+ return;
}
+ FieldSpec fieldSpec = schema.getFieldSpecFor(columnName);
+ Preconditions.checkState(fieldSpec != null,
+ "Column: %s defined in field config list must be a valid column defined in the schema", columnName);
+
+ // Validate the forward index disabled compatibility with other indexes if enabled for this column
+ validateForwardIndexDisabledIndexCompatibility(columnName, fieldConfig, indexingConfig, schema);
if (CollectionUtils.isNotEmpty(fieldConfig.getIndexTypes())) {
for (FieldConfig.IndexType indexType : fieldConfig.getIndexTypes()) {
switch (indexType) {
- case FST:
- Preconditions.checkArgument(fieldConfig.getEncodingType() == FieldConfig.EncodingType.DICTIONARY,
- "FST Index is only enabled on dictionary encoded columns");
- Preconditions.checkState(fieldConfigColSpec.isSingleValueField()
- && fieldConfigColSpec.getDataType().getStoredType() == DataType.STRING,
- "FST Index is only supported for single value string columns");
- break;
case INVERTED:
- Preconditions.checkArgument(fieldConfig.getEncodingType() == FieldConfig.EncodingType.DICTIONARY,
- "Cannot create an Inverted Index on column: " + fieldConfig.getName() + ", specified as "
- + "a non dictionary column");
+ Preconditions.checkState(fieldConfig.getEncodingType() == EncodingType.DICTIONARY,
+ "Cannot create inverted index on column: %s, it can only be applied to dictionary encoded columns",
+ columnName);
break;
case TEXT:
- Preconditions.checkState(fieldConfigColSpec.getDataType().getStoredType() == DataType.STRING,
- "TEXT Index is only supported for string columns");
+ Preconditions.checkState(fieldSpec.getDataType().getStoredType() == DataType.STRING,
+ "Cannot create text index on column: %s, it can only be applied to string columns", columnName);
+ break;
+ case FST:
+ Preconditions.checkState(
+ fieldConfig.getEncodingType() == EncodingType.DICTIONARY && fieldSpec.isSingleValueField()
+ && fieldSpec.getDataType().getStoredType() == DataType.STRING,
+ "Cannot create FST index on column: %s, it can only be applied to dictionary encoded single value "
+ + "string columns", columnName);
break;
case TIMESTAMP:
- Preconditions.checkState(fieldConfigColSpec.getDataType() == DataType.TIMESTAMP,
- "TIMESTAMP Index is only supported for timestamp columns");
+ Preconditions.checkState(fieldSpec.getDataType() == DataType.TIMESTAMP,
+ "Cannot create timestamp index on column: %s, it can only be applied to timestamp columns",
+ columnName);
break;
default:
break;
@@ -1189,7 +1201,7 @@ public final class TableConfigUtils {
* back or generate a new index for existing segments is to either refresh or back-fill the segments.
*/
private static void validateForwardIndexDisabledIndexCompatibility(String columnName, FieldConfig fieldConfig,
- IndexingConfig indexingConfigs, List<String> noDictionaryColumns, Schema schema) {
+ IndexingConfig indexingConfig, Schema schema) {
Map<String, String> fieldConfigProperties = fieldConfig.getProperties();
if (fieldConfigProperties == null) {
return;
@@ -1204,26 +1216,24 @@ public final class TableConfigUtils {
FieldSpec fieldSpec = schema.getFieldSpecFor(columnName);
// Check for the range index since the index itself relies on the existence of the forward index to work.
- if (indexingConfigs.getRangeIndexColumns() != null && indexingConfigs.getRangeIndexColumns().contains(columnName)) {
+ if (indexingConfig.getRangeIndexColumns() != null && indexingConfig.getRangeIndexColumns().contains(columnName)) {
Preconditions.checkState(fieldSpec.isSingleValueField(), String.format("Feature not supported for multi-value "
+ "columns with range index. Cannot disable forward index for column %s. Disable range index on this "
+ "column to use this feature", columnName));
- Preconditions.checkState(indexingConfigs.getRangeIndexVersion() == BitSlicedRangeIndexCreator.VERSION,
+ Preconditions.checkState(indexingConfig.getRangeIndexVersion() == BitSlicedRangeIndexCreator.VERSION,
String.format("Feature not supported for single-value columns with range index version < 2. Cannot disable "
+ "forward index for column %s. Either disable range index or create range index with"
+ " version >= 2 to use this feature", columnName));
}
- Preconditions.checkState(
- !indexingConfigs.isOptimizeDictionaryForMetrics() && !indexingConfigs.isOptimizeDictionary(), String.format(
+ Preconditions.checkState(!indexingConfig.isOptimizeDictionaryForMetrics() && !indexingConfig.isOptimizeDictionary(),
+ String.format(
"Dictionary override optimization options (OptimizeDictionary, optimizeDictionaryForMetrics)"
+ " not supported with forward index for column: %s, disabled", columnName));
- boolean hasDictionary =
- fieldConfig.getEncodingType() == FieldConfig.EncodingType.DICTIONARY || noDictionaryColumns == null
- || !noDictionaryColumns.contains(columnName);
+ boolean hasDictionary = fieldConfig.getEncodingType() == EncodingType.DICTIONARY;
boolean hasInvertedIndex =
- indexingConfigs.getInvertedIndexColumns() != null && indexingConfigs.getInvertedIndexColumns()
+ indexingConfig.getInvertedIndexColumns() != null && indexingConfig.getInvertedIndexColumns()
.contains(columnName);
if (!hasDictionary || !hasInvertedIndex) {
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedBitMVEntryDictForwardIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedBitMVEntryDictForwardIndexTest.java
new file mode 100644
index 0000000000..12d3b2c352
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedBitMVEntryDictForwardIndexTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.forward;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.io.writer.impl.FixedBitMVEntryDictForwardIndexWriter;
+import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVEntryDictForwardIndexReader;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class FixedBitMVEntryDictForwardIndexTest {
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "FixedBitMVEntryDictForwardIndexTest");
+ private static final File INDEX_FILE =
+ new File(TEMP_DIR, "testColumn" + V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION);
+ private static final int NUM_DOCS = 1000;
+ private static final int MAX_NUM_VALUES_PER_MV_ENTRY = 3;
+ private static final Random RANDOM = new Random();
+
+ @BeforeClass
+ public void setUp()
+ throws IOException {
+ FileUtils.forceMkdir(TEMP_DIR);
+ }
+
+ @Test
+ public void testRandomGeneratedValues()
+ throws Exception {
+ for (int numBitsPerValue = 1; numBitsPerValue <= 31; numBitsPerValue++) {
+ // Generate random values
+ int[][] valuesArray = new int[NUM_DOCS][];
+ int maxValue = numBitsPerValue != 31 ? 1 << numBitsPerValue : Integer.MAX_VALUE;
+ for (int i = 0; i < NUM_DOCS; i++) {
+ int numValues = RANDOM.nextInt(MAX_NUM_VALUES_PER_MV_ENTRY + 1);
+ int[] values = new int[numValues];
+ for (int j = 0; j < numValues; j++) {
+ values[j] = RANDOM.nextInt(maxValue);
+ }
+ valuesArray[i] = values;
+ }
+
+ // Create the forward index
+ try (
+ FixedBitMVEntryDictForwardIndexWriter writer = new FixedBitMVEntryDictForwardIndexWriter(INDEX_FILE, NUM_DOCS,
+ numBitsPerValue)) {
+ for (int[] values : valuesArray) {
+ writer.putDictIds(values);
+ }
+ }
+
+ // Read the forward index
+ try (PinotDataBuffer dataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(INDEX_FILE);
+ FixedBitMVEntryDictForwardIndexReader reader = new FixedBitMVEntryDictForwardIndexReader(dataBuffer, NUM_DOCS,
+ numBitsPerValue)) {
+ int[] valueBuffer = new int[MAX_NUM_VALUES_PER_MV_ENTRY];
+ for (int i = 0; i < NUM_DOCS; i++) {
+ int numValues = reader.getDictIdMV(i, valueBuffer, null);
+ assertEquals(numValues, valuesArray[i].length);
+ for (int j = 0; j < numValues; j++) {
+ assertEquals(valueBuffer[j], valuesArray[i][j]);
+ }
+ }
+ }
+
+ FileUtils.forceDelete(INDEX_FILE);
+ }
+ }
+
+ @Test
+ public void testAllEmptyValues()
+ throws Exception {
+ // Create the forward index
+ try (FixedBitMVEntryDictForwardIndexWriter writer = new FixedBitMVEntryDictForwardIndexWriter(INDEX_FILE, NUM_DOCS,
+ 1)) {
+ int[] value = new int[0];
+ for (int i = 0; i < NUM_DOCS; i++) {
+ writer.putDictIds(value);
+ }
+ }
+
+ // Read the forward index
+ try (PinotDataBuffer dataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(INDEX_FILE);
+ FixedBitMVEntryDictForwardIndexReader reader = new FixedBitMVEntryDictForwardIndexReader(dataBuffer, NUM_DOCS,
+ 1)) {
+ int[] valueBuffer = new int[0];
+ for (int i = 0; i < NUM_DOCS; i++) {
+ assertEquals(reader.getDictIdMV(i, valueBuffer, null), 0);
+ }
+ }
+
+ FileUtils.forceDelete(INDEX_FILE);
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws IOException {
+ FileUtils.deleteDirectory(TEMP_DIR);
+ }
+}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java
index 2f438a5b6a..c72a95a4e9 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import org.apache.pinot.segment.local.segment.index.AbstractSerdeIndexContract;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.DictIdCompressionType;
import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.spi.config.table.FieldConfig;
@@ -162,29 +163,17 @@ public class ForwardIndexTypeTest {
}
@Test
- public void oldConfEnableDictWithSnappyCompression()
+ public void oldConfEnableDictWithMVEntryDictFormat()
throws IOException {
addFieldIndexConfig(""
- + " {\n"
- + " \"name\": \"dimInt\","
- + " \"encodingType\": \"DICTIONARY\",\n"
- + " \"compressionCodec\": \"SNAPPY\"\n"
- + " }"
+ + "{"
+ + " \"name\": \"dimInt\","
+ + " \"encodingType\": \"DICTIONARY\","
+ + " \"compressionCodec\": \"MV_ENTRY_DICT\""
+ + "}"
);
- assertEquals(ForwardIndexConfig.DEFAULT);
- }
-
- @Test
- public void oldConfEnableDictWithLZ4Compression()
- throws IOException {
- addFieldIndexConfig(""
- + " {\n"
- + " \"name\": \"dimInt\","
- + " \"encodingType\": \"DICTIONARY\",\n"
- + " \"compressionCodec\": \"LZ4\"\n"
- + " }"
- );
- assertEquals(ForwardIndexConfig.DEFAULT);
+ assertEquals(
+ new ForwardIndexConfig.Builder().withDictIdCompressionType(DictIdCompressionType.MV_ENTRY_DICT).build());
}
@Test
@@ -197,13 +186,7 @@ public class ForwardIndexTypeTest {
+ " }"
);
- assertEquals(
- new ForwardIndexConfig.Builder()
- .withCompressionType(null)
- .withDeriveNumDocsPerChunk(false)
- .withRawIndexWriterVersion(ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION)
- .build()
- );
+ assertEquals(ForwardIndexConfig.DEFAULT);
}
@Test(dataProvider = "allChunkCompressionType", dataProviderClass = ForwardIndexTypeTest.class)
@@ -269,7 +252,7 @@ public class ForwardIndexTypeTest {
}
@Test
- public void newConfigDisabled2()
+ public void newConfigDisabled()
throws IOException {
addFieldIndexConfig("{\n"
+ " \"name\": \"dimInt\",\n"
@@ -296,6 +279,23 @@ public class ForwardIndexTypeTest {
assertEquals(ForwardIndexConfig.DEFAULT);
}
+ @Test
+ public void newConfigMVEntryDictFormat()
+ throws IOException {
+ addFieldIndexConfig(""
+ + "{"
+ + " \"name\": \"dimInt\","
+ + " \"indexes\" : {"
+ + " \"forward\": {"
+ + " \"dictIdCompressionType\": \"MV_ENTRY_DICT\""
+ + " }"
+ + " }"
+ + "}"
+ );
+ assertEquals(
+ new ForwardIndexConfig.Builder().withDictIdCompressionType(DictIdCompressionType.MV_ENTRY_DICT).build());
+ }
+
@Test(dataProvider = "allChunkCompressionType", dataProviderClass = ForwardIndexTypeTest.class)
public void newConfigEnabled(String compression)
throws IOException {
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
index 2800cf9053..6077ca1625 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
@@ -44,6 +45,7 @@ import org.apache.pinot.segment.local.segment.store.SegmentLocalFSDirectory;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.DictIdCompressionType;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
import org.apache.pinot.segment.spi.index.IndexType;
@@ -53,6 +55,7 @@ import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec;
import org.apache.pinot.spi.config.table.IndexConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -70,10 +73,7 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.*;
public class ForwardIndexHandlerTest {
@@ -104,7 +104,6 @@ public class ForwardIndexHandlerTest {
// Sorted columns
private static final String DIM_RAW_SORTED_INTEGER = "DIM_RAW_SORTED_INTEGER";
-
// Metric columns
private static final String METRIC_PASS_THROUGH_INTEGER = "METRIC_PASS_THROUGH_INTEGER";
private static final String METRIC_SNAPPY_INTEGER = "METRIC_SNAPPY_INTEGER";
@@ -191,6 +190,9 @@ public class ForwardIndexHandlerTest {
DIM_DICT_LONG, DIM_DICT_STRING, DIM_DICT_BYES, DIM_DICT_MV_BYTES, DIM_DICT_MV_STRING,
DIM_DICT_MV_INTEGER, DIM_DICT_MV_LONG);
+ private static final List<String> DICT_ENABLED_MV_COLUMNS_WITH_FORWARD_INDEX =
+ Arrays.asList(DIM_DICT_MV_INTEGER, DIM_DICT_MV_LONG, DIM_DICT_MV_STRING, DIM_DICT_MV_BYTES);
+
private static final List<String> SV_FORWARD_INDEX_DISABLED_COLUMNS = Arrays.asList(
DIM_SV_FORWARD_INDEX_DISABLED_INTEGER, DIM_SV_FORWARD_INDEX_DISABLED_LONG, DIM_SV_FORWARD_INDEX_DISABLED_STRING,
DIM_SV_FORWARD_INDEX_DISABLED_BYTES);
@@ -206,14 +208,16 @@ public class ForwardIndexHandlerTest {
private static final List<String> FORWARD_INDEX_DISABLED_RAW_COLUMNS =
Arrays.asList(DIM_RAW_SV_FORWARD_INDEX_DISABLED_INTEGER, DIM_RAW_MV_FORWARD_INDEX_DISABLED_INTEGER);
+ private static final List<CompressionCodec> RAW_COMPRESSION_TYPES =
+ Arrays.stream(CompressionCodec.values()).filter(CompressionCodec::isApplicableToRawIndex)
+ .collect(Collectors.toList());
+
private final List<String> _noDictionaryColumns = new ArrayList<>();
private final List<String> _forwardIndexDisabledColumns = new ArrayList<>();
private final List<String> _invertedIndexColumns = new ArrayList<>();
TableConfig _tableConfig;
Schema _schema;
File _segmentDirectory;
- private List<FieldConfig.CompressionCodec> _allCompressionTypes =
- Arrays.asList(FieldConfig.CompressionCodec.values());
@BeforeMethod
public void setUp()
@@ -242,27 +246,27 @@ public class ForwardIndexHandlerTest {
for (String indexColumn : RAW_SNAPPY_INDEX_COLUMNS) {
fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(),
- FieldConfig.CompressionCodec.SNAPPY, null));
+ CompressionCodec.SNAPPY, null));
}
for (String indexColumn : RAW_SORTED_INDEX_COLUMNS) {
fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW,
- Collections.singletonList(FieldConfig.IndexType.SORTED), FieldConfig.CompressionCodec.SNAPPY, null));
+ Collections.singletonList(FieldConfig.IndexType.SORTED), CompressionCodec.SNAPPY, null));
}
for (String indexColumn : RAW_ZSTANDARD_INDEX_COLUMNS) {
fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(),
- FieldConfig.CompressionCodec.ZSTANDARD, null));
+ CompressionCodec.ZSTANDARD, null));
}
for (String indexColumn : RAW_PASS_THROUGH_INDEX_COLUMNS) {
fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(),
- FieldConfig.CompressionCodec.PASS_THROUGH, null));
+ CompressionCodec.PASS_THROUGH, null));
}
for (String indexColumn : RAW_LZ4_INDEX_COLUMNS) {
fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(),
- FieldConfig.CompressionCodec.LZ4, null));
+ CompressionCodec.LZ4, null));
}
for (String indexColumn : SV_FORWARD_INDEX_DISABLED_COLUMNS) {
@@ -284,8 +288,8 @@ public class ForwardIndexHandlerTest {
}
for (String indexColumn : FORWARD_INDEX_DISABLED_RAW_COLUMNS) {
- fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW,
- Collections.emptyList(), FieldConfig.CompressionCodec.LZ4,
+ fieldConfigs.add(
+ new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(), CompressionCodec.LZ4,
Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString())));
}
@@ -696,8 +700,9 @@ public class ForwardIndexHandlerTest {
|| DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX.equals(name)
|| DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX.equals(name));
FieldConfig config = fieldConfigs.remove(randIdx);
- FieldConfig.CompressionCodec newCompressionType = null;
- for (FieldConfig.CompressionCodec type : _allCompressionTypes) {
+ CompressionCodec newCompressionType = null;
+ for (CompressionCodec type : CompressionCodec.values()) {
+
if (config.getCompressionCodec() != type) {
newCompressionType = type;
break;
@@ -717,7 +722,7 @@ public class ForwardIndexHandlerTest {
Map<String, List<ForwardIndexHandler.Operation>> operationMap = fwdIndexHandler.computeOperations(writer);
assertEquals(operationMap.size(), 1);
assertEquals(operationMap.get(config.getName()),
- Collections.singletonList(ForwardIndexHandler.Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE));
+ Collections.singletonList(ForwardIndexHandler.Operation.CHANGE_INDEX_COMPRESSION_TYPE));
// TEST2: Change compression and add index. Change compressionType for more than 1 column.
fieldConfigs = new ArrayList<>(_tableConfig.getFieldConfigList());
@@ -725,10 +730,10 @@ public class ForwardIndexHandlerTest {
FieldConfig config2 = fieldConfigs.remove(1);
FieldConfig newConfig1 = new FieldConfig(config1.getName(), FieldConfig.EncodingType.RAW, Collections.emptyList(),
- FieldConfig.CompressionCodec.ZSTANDARD, null);
+ CompressionCodec.ZSTANDARD, null);
fieldConfigs.add(newConfig1);
FieldConfig newConfig2 = new FieldConfig(config2.getName(), FieldConfig.EncodingType.RAW, Collections.emptyList(),
- FieldConfig.CompressionCodec.ZSTANDARD, null);
+ CompressionCodec.ZSTANDARD, null);
fieldConfigs.add(newConfig2);
tableConfig =
@@ -743,9 +748,9 @@ public class ForwardIndexHandlerTest {
operationMap = fwdIndexHandler.computeOperations(writer);
assertEquals(operationMap.size(), 2);
assertEquals(operationMap.get(config1.getName()),
- Collections.singletonList(ForwardIndexHandler.Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE));
+ Collections.singletonList(ForwardIndexHandler.Operation.CHANGE_INDEX_COMPRESSION_TYPE));
assertEquals(operationMap.get(config2.getName()),
- Collections.singletonList(ForwardIndexHandler.Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE));
+ Collections.singletonList(ForwardIndexHandler.Operation.CHANGE_INDEX_COMPRESSION_TYPE));
// Tear down
segmentLocalFSDirectory.close();
@@ -963,8 +968,8 @@ public class ForwardIndexHandlerTest {
name = fieldConfigs.get(randIdx).getName();
} while (!SV_FORWARD_INDEX_DISABLED_COLUMNS.contains(name) && !MV_FORWARD_INDEX_DISABLED_COLUMNS.contains(name));
FieldConfig config = fieldConfigs.remove(randIdx);
- FieldConfig.CompressionCodec newCompressionType = null;
- for (FieldConfig.CompressionCodec type : _allCompressionTypes) {
+ CompressionCodec newCompressionType = null;
+ for (CompressionCodec type : RAW_COMPRESSION_TYPES) {
if (config.getCompressionCodec() != type) {
newCompressionType = type;
break;
@@ -1083,7 +1088,7 @@ public class ForwardIndexHandlerTest {
continue;
}
// For every noDictionaryColumn, change the compressionType to all available types, one by one.
- for (FieldConfig.CompressionCodec compressionType : _allCompressionTypes) {
+ for (CompressionCodec compressionType : RAW_COMPRESSION_TYPES) {
// Setup
SegmentMetadataImpl existingSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
SegmentDirectory segmentLocalFSDirectory =
@@ -1100,10 +1105,9 @@ public class ForwardIndexHandlerTest {
}
FieldConfig config = fieldConfigs.remove(index);
String columnName = config.getName();
- FieldConfig.CompressionCodec newCompressionType = compressionType;
FieldConfig newConfig =
- new FieldConfig(columnName, FieldConfig.EncodingType.RAW, Collections.emptyList(), newCompressionType,
+ new FieldConfig(columnName, FieldConfig.EncodingType.RAW, Collections.emptyList(), compressionType,
null);
fieldConfigs.add(newConfig);
@@ -1125,7 +1129,7 @@ public class ForwardIndexHandlerTest {
ColumnMetadata metadata = existingSegmentMetadata.getColumnMetadataFor(columnName);
testIndexExists(columnName, StandardIndexes.forward());
validateIndexMap(columnName, false, false);
- validateForwardIndex(columnName, newCompressionType, metadata.isSorted());
+ validateForwardIndex(columnName, compressionType, metadata.isSorted());
// Validate metadata properties. Nothing should change when a forwardIndex is rewritten for compressionType
// change.
@@ -1138,6 +1142,79 @@ public class ForwardIndexHandlerTest {
}
}
+ @Test
+ public void testChangeDictCompression()
+ throws Exception {
+ List<FieldConfig> fieldConfigs = new ArrayList<>(_tableConfig.getFieldConfigList());
+
+ // Change to MV_ENTRY_DICT compression
+ for (String column : DICT_ENABLED_MV_COLUMNS_WITH_FORWARD_INDEX) {
+ FieldConfig newFieldConfig = new FieldConfig(column, FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(),
+ CompressionCodec.MV_ENTRY_DICT, null);
+ fieldConfigs.add(newFieldConfig);
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+ .setNoDictionaryColumns(_noDictionaryColumns).setFieldConfigList(fieldConfigs).build();
+
+ SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
+ try (SegmentDirectory segmentLocalFSDirectory = new SegmentLocalFSDirectory(_segmentDirectory, segmentMetadata,
+ ReadMode.mmap); SegmentDirectory.Writer writer = segmentLocalFSDirectory.createWriter()) {
+ ForwardIndexHandler forwardIndexHandler =
+ new ForwardIndexHandler(segmentLocalFSDirectory, new IndexLoadingConfig(tableConfig, null), null);
+
+ Map<String, List<ForwardIndexHandler.Operation>> operations = forwardIndexHandler.computeOperations(writer);
+ assertEquals(operations, Collections.singletonMap(column,
+ Collections.singletonList(ForwardIndexHandler.Operation.CHANGE_INDEX_COMPRESSION_TYPE)));
+ assertTrue(forwardIndexHandler.needUpdateIndices(writer));
+
+ forwardIndexHandler.updateIndices(writer);
+ forwardIndexHandler.postUpdateIndicesCleanup(writer);
+ }
+
+ segmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
+ try (SegmentDirectory segmentLocalFSDirectory = new SegmentLocalFSDirectory(_segmentDirectory, segmentMetadata,
+ ReadMode.mmap); SegmentDirectory.Reader reader = segmentLocalFSDirectory.createReader()) {
+ ForwardIndexReader<?> forwardIndexReader =
+ ForwardIndexType.read(reader, segmentMetadata.getColumnMetadataFor(column));
+ assertTrue(forwardIndexReader.isDictionaryEncoded());
+ assertFalse(forwardIndexReader.isSingleValue());
+ assertEquals(forwardIndexReader.getDictIdCompressionType(), DictIdCompressionType.MV_ENTRY_DICT);
+ }
+ }
+
+ // Change back to regular forward index
+ for (int i = 0; i < DICT_ENABLED_MV_COLUMNS_WITH_FORWARD_INDEX.size(); i++) {
+ FieldConfig fieldConfig = fieldConfigs.remove(fieldConfigs.size() - 1);
+ String column = fieldConfig.getName();
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+ .setNoDictionaryColumns(_noDictionaryColumns).setFieldConfigList(fieldConfigs).build();
+
+ SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
+ try (SegmentDirectory segmentLocalFSDirectory = new SegmentLocalFSDirectory(_segmentDirectory, segmentMetadata,
+ ReadMode.mmap); SegmentDirectory.Writer writer = segmentLocalFSDirectory.createWriter()) {
+ ForwardIndexHandler forwardIndexHandler =
+ new ForwardIndexHandler(segmentLocalFSDirectory, new IndexLoadingConfig(tableConfig, null), null);
+
+ Map<String, List<ForwardIndexHandler.Operation>> operations = forwardIndexHandler.computeOperations(writer);
+ assertEquals(operations, Collections.singletonMap(column,
+ Collections.singletonList(ForwardIndexHandler.Operation.CHANGE_INDEX_COMPRESSION_TYPE)));
+ assertTrue(forwardIndexHandler.needUpdateIndices(writer));
+
+ forwardIndexHandler.updateIndices(writer);
+ forwardIndexHandler.postUpdateIndicesCleanup(writer);
+ }
+
+ segmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
+ try (SegmentDirectory segmentLocalFSDirectory = new SegmentLocalFSDirectory(_segmentDirectory, segmentMetadata,
+ ReadMode.mmap); SegmentDirectory.Reader reader = segmentLocalFSDirectory.createReader()) {
+ ForwardIndexReader<?> forwardIndexReader =
+ ForwardIndexType.read(reader, segmentMetadata.getColumnMetadataFor(column));
+ assertTrue(forwardIndexReader.isDictionaryEncoded());
+ assertFalse(forwardIndexReader.isSingleValue());
+ assertNull(forwardIndexReader.getDictIdCompressionType());
+ }
+ }
+ }
+
@Test
public void testChangeCompressionForMultipleColumns()
throws Exception {
@@ -1149,8 +1226,8 @@ public class ForwardIndexHandlerTest {
List<FieldConfig> fieldConfigs = new ArrayList<>(_tableConfig.getFieldConfigList());
Random rand = new Random();
- int randomIdx = rand.nextInt(_allCompressionTypes.size());
- FieldConfig.CompressionCodec newCompressionType = _allCompressionTypes.get(randomIdx);
+ int randomIdx = rand.nextInt(RAW_COMPRESSION_TYPES.size());
+ CompressionCodec newCompressionType = RAW_COMPRESSION_TYPES.get(randomIdx);
// Column 1
String name;
@@ -1497,7 +1574,7 @@ public class ForwardIndexHandlerTest {
testIndexExists(column, StandardIndexes.forward());
validateIndexMap(column, false, false);
// All the columns are dimensions. So default compression type is LZ4.
- validateForwardIndex(column, FieldConfig.CompressionCodec.LZ4, metadata.isSorted());
+ validateForwardIndex(column, CompressionCodec.LZ4, metadata.isSorted());
// In column metadata, nothing other than hasDictionary and dictionaryElementSize should change.
validateMetadataProperties(column, false, 0, metadata.getCardinality(), metadata.getTotalDocs(),
@@ -1538,7 +1615,7 @@ public class ForwardIndexHandlerTest {
testIndexExists(column1, StandardIndexes.forward());
validateIndexMap(column1, false, false);
// All the columns are dimensions. So default compression type is LZ4.
- validateForwardIndex(column1, FieldConfig.CompressionCodec.LZ4, metadata.isSorted());
+ validateForwardIndex(column1, CompressionCodec.LZ4, metadata.isSorted());
// In column metadata, nothing other than hasDictionary and dictionaryElementSize should change.
validateMetadataProperties(column1, false, 0, metadata.getCardinality(), metadata.getTotalDocs(),
@@ -1551,7 +1628,7 @@ public class ForwardIndexHandlerTest {
testIndexExists(column2, StandardIndexes.forward());
validateIndexMap(column2, false, false);
// All the columns are dimensions. So default compression type is LZ4.
- validateForwardIndex(column2, FieldConfig.CompressionCodec.LZ4, metadata.isSorted());
+ validateForwardIndex(column2, CompressionCodec.LZ4, metadata.isSorted());
// In column metadata, nothing other than hasDictionary and dictionaryElementSize should change.
validateMetadataProperties(column2, false, 0, metadata.getCardinality(), metadata.getTotalDocs(),
@@ -1940,7 +2017,7 @@ public class ForwardIndexHandlerTest {
// Col1 validation.
ColumnMetadata metadata = existingSegmentMetadata.getColumnMetadataFor(col1);
validateIndexMap(col1, false, false);
- validateForwardIndex(col1, FieldConfig.CompressionCodec.LZ4, metadata.isSorted());
+ validateForwardIndex(col1, CompressionCodec.LZ4, metadata.isSorted());
// In column metadata, nothing should change.
validateMetadataProperties(col1, false, 0, metadata.getCardinality(),
metadata.getTotalDocs(), metadata.getDataType(), metadata.getFieldType(), metadata.isSorted(),
@@ -1950,7 +2027,7 @@ public class ForwardIndexHandlerTest {
// Col2 validation.
metadata = existingSegmentMetadata.getColumnMetadataFor(col2);
validateIndexMap(col2, false, false);
- validateForwardIndex(col2, FieldConfig.CompressionCodec.LZ4, metadata.isSorted());
+ validateForwardIndex(col2, CompressionCodec.LZ4, metadata.isSorted());
// In column metadata, nothing should change.
validateMetadataProperties(col2, false, 0, metadata.getCardinality(),
metadata.getTotalDocs(), metadata.getDataType(), metadata.getFieldType(), metadata.isSorted(),
@@ -1985,7 +2062,7 @@ public class ForwardIndexHandlerTest {
// Column validation.
ColumnMetadata metadata = existingSegmentMetadata.getColumnMetadataFor(column);
validateIndexMap(column, false, false);
- validateForwardIndex(column, FieldConfig.CompressionCodec.LZ4, metadata.isSorted());
+ validateForwardIndex(column, CompressionCodec.LZ4, metadata.isSorted());
// In column metadata, some values can change since MV columns with duplicates lose the duplicates on forward index
// regeneration.
validateMetadataProperties(column, false, 0, metadata.getCardinality(),
@@ -2030,7 +2107,7 @@ public class ForwardIndexHandlerTest {
ColumnMetadata metadata = existingSegmentMetadata.getColumnMetadataFor(column);
validateIndexMap(column, false, false);
- validateForwardIndex(column, FieldConfig.CompressionCodec.LZ4, metadata.isSorted());
+ validateForwardIndex(column, CompressionCodec.LZ4, metadata.isSorted());
// In column metadata, nothing should change.
validateMetadataProperties(column, false, 0,
@@ -2308,7 +2385,7 @@ public class ForwardIndexHandlerTest {
}
}
- private void validateForwardIndex(String columnName, @Nullable FieldConfig.CompressionCodec expectedCompressionType,
+ private void validateForwardIndex(String columnName, @Nullable CompressionCodec expectedCompressionType,
boolean isSorted)
throws IOException {
// Setup
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
index 8e549d2ba3..3cc14682bc 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
@@ -62,6 +62,7 @@ import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
import org.apache.pinot.segment.spi.utils.SegmentMetadataUtils;
import org.apache.pinot.spi.config.table.BloomFilterConfig;
+import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec;
import org.apache.pinot.spi.config.table.IndexConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
@@ -656,9 +657,8 @@ public class SegmentPreProcessorTest {
@Test
public void testForwardIndexHandlerChangeCompression()
throws Exception {
- Map<String, ChunkCompressionType> compressionConfigs = new HashMap<>();
- ChunkCompressionType newCompressionType = ChunkCompressionType.ZSTANDARD;
- compressionConfigs.put(EXISTING_STRING_COL_RAW, newCompressionType);
+ Map<String, CompressionCodec> compressionConfigs = new HashMap<>();
+ compressionConfigs.put(EXISTING_STRING_COL_RAW, CompressionCodec.ZSTANDARD);
_indexLoadingConfig.setCompressionConfigs(compressionConfigs);
_indexLoadingConfig.addNoDictionaryColumns(EXISTING_STRING_COL_RAW);
@@ -672,12 +672,11 @@ public class SegmentPreProcessorTest {
new SegmentV1V2ToV3FormatConverter().convert(_indexDir);
// Test2: Now forward index will be rewritten with ZSTANDARD compressionType.
- checkForwardIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, newCompressionType, true,
- 0, DataType.STRING, 100000);
+ checkForwardIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0,
+ ChunkCompressionType.ZSTANDARD, true, 0, DataType.STRING, 100000);
// Test3: Change compression on existing raw index column. Also add text index on same column. Check correctness.
- newCompressionType = ChunkCompressionType.SNAPPY;
- compressionConfigs.put(EXISTING_STRING_COL_RAW, newCompressionType);
+ compressionConfigs.put(EXISTING_STRING_COL_RAW, CompressionCodec.SNAPPY);
_indexLoadingConfig.setCompressionConfigs(compressionConfigs);
Set<String> textIndexColumns = new HashSet<>();
textIndexColumns.add(EXISTING_STRING_COL_RAW);
@@ -688,12 +687,11 @@ public class SegmentPreProcessorTest {
ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(EXISTING_STRING_COL_RAW);
assertNotNull(columnMetadata);
checkTextIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0);
- validateIndex(StandardIndexes.forward(), EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, true,
- 0, newCompressionType, false, DataType.STRING, 100000);
+ validateIndex(StandardIndexes.forward(), EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, true, 0,
+ ChunkCompressionType.SNAPPY, false, DataType.STRING, 100000);
// Test4: Change compression on RAW index column. Change another index on another column. Check correctness.
- newCompressionType = ChunkCompressionType.ZSTANDARD;
- compressionConfigs.put(EXISTING_STRING_COL_RAW, newCompressionType);
+ compressionConfigs.put(EXISTING_STRING_COL_RAW, CompressionCodec.ZSTANDARD);
_indexLoadingConfig.setCompressionConfigs(compressionConfigs);
Set<String> fstColumns = new HashSet<>();
fstColumns.add(EXISTING_STRING_COL_DICT);
@@ -706,12 +704,11 @@ public class SegmentPreProcessorTest {
// Check FST index
checkFSTIndexCreation(EXISTING_STRING_COL_DICT, 9, 4, _newColumnsSchemaWithFST, false, false, 26);
// Check forward index.
- validateIndex(StandardIndexes.forward(), EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, true,
- 0, newCompressionType, false, DataType.STRING, 100000);
+ validateIndex(StandardIndexes.forward(), EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, true, 0,
+ ChunkCompressionType.ZSTANDARD, false, DataType.STRING, 100000);
// Test5: Change compressionType for an MV column
- newCompressionType = ChunkCompressionType.ZSTANDARD;
- compressionConfigs.put(EXISTING_INT_COL_RAW_MV, newCompressionType);
+ compressionConfigs.put(EXISTING_INT_COL_RAW_MV, CompressionCodec.ZSTANDARD);
_indexLoadingConfig.setCompressionConfigs(compressionConfigs);
_indexLoadingConfig.addNoDictionaryColumns(EXISTING_INT_COL_RAW_MV);
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 7653ff7636..9c9b2cdf43 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -31,6 +31,7 @@ import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.DedupConfig;
import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
import org.apache.pinot.spi.config.table.RoutingConfig;
@@ -706,9 +707,8 @@ public class TableConfigUtilsTest {
streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE, "100m");
streamConfigs.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS);
ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(List.of(streamConfigs)));
- tableConfig =
- new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
- .setIngestionConfig(ingestionConfig).build();
+ tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
+ .setIngestionConfig(ingestionConfig).build();
try {
TableConfigUtils.validate(tableConfig, schema);
@@ -720,9 +720,8 @@ public class TableConfigUtilsTest {
// When size based threshold is specified, rows has to be set to 0.
streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, "1000");
ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(List.of(streamConfigs)));
- tableConfig =
- new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn")
- .setIngestionConfig(ingestionConfig).build();
+ tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn")
+ .setIngestionConfig(ingestionConfig).build();
try {
TableConfigUtils.validate(tableConfig, schema);
@@ -734,9 +733,8 @@ public class TableConfigUtilsTest {
// When size based threshold is specified, rows has to be set to 0.
streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, "0");
ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(List.of(streamConfigs)));
- tableConfig =
- new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn")
- .setIngestionConfig(ingestionConfig).build();
+ tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn")
+ .setIngestionConfig(ingestionConfig).build();
try {
TableConfigUtils.validate(tableConfig, schema);
@@ -1030,7 +1028,9 @@ public class TableConfigUtilsTest {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail since FST index is enabled on RAW encoding type");
} catch (Exception e) {
- Assert.assertEquals(e.getMessage(), "FST Index is only enabled on dictionary encoded columns");
+ Assert.assertEquals(e.getMessage(),
+ "Cannot create FST index on column: myCol1, it can only be applied to dictionary encoded single value "
+ + "string columns");
}
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
@@ -1041,7 +1041,9 @@ public class TableConfigUtilsTest {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail since FST index is enabled on multi value column");
} catch (Exception e) {
- Assert.assertEquals(e.getMessage(), "FST Index is only supported for single value string columns");
+ Assert.assertEquals(e.getMessage(),
+ "Cannot create FST index on column: myCol2, it can only be applied to dictionary encoded single value "
+ + "string columns");
}
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
@@ -1052,7 +1054,9 @@ public class TableConfigUtilsTest {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail since FST index is enabled on non String column");
} catch (Exception e) {
- Assert.assertEquals(e.getMessage(), "FST Index is only supported for single value string columns");
+ Assert.assertEquals(e.getMessage(),
+ "Cannot create FST index on column: intCol, it can only be applied to dictionary encoded single value "
+ + "string columns");
}
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
@@ -1064,7 +1068,8 @@ public class TableConfigUtilsTest {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail since TEXT index is enabled on non String column");
} catch (Exception e) {
- Assert.assertEquals(e.getMessage(), "TEXT Index is only supported for string columns");
+ Assert.assertEquals(e.getMessage(),
+ "Cannot create text index on column: intCol, it can only be applied to string columns");
}
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
@@ -1077,29 +1082,29 @@ public class TableConfigUtilsTest {
Assert.fail("Should fail since field name is not present in schema");
} catch (Exception e) {
Assert.assertEquals(e.getMessage(),
- "Column Name myCol21 defined in field config list must be a valid column defined in the schema");
+ "Column: myCol21 defined in field config list must be a valid column defined in the schema");
}
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
try {
FieldConfig fieldConfig = new FieldConfig("intCol", FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(),
- FieldConfig.CompressionCodec.SNAPPY, null);
+ CompressionCodec.SNAPPY, null);
tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
TableConfigUtils.validate(tableConfig, schema);
- Assert.fail("Should fail since dictionary encoding does not support compression codec snappy");
+ Assert.fail("Should fail since dictionary encoding does not support compression codec SNAPPY");
} catch (Exception e) {
- Assert.assertEquals(e.getMessage(), "Set compression codec to null for dictionary encoding type");
+ Assert.assertEquals(e.getMessage(), "Compression codec: SNAPPY is not applicable to dictionary encoded index");
}
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
try {
- FieldConfig fieldConfig = new FieldConfig("intCol", FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(),
- FieldConfig.CompressionCodec.ZSTANDARD, null);
+ FieldConfig fieldConfig = new FieldConfig("intCol", FieldConfig.EncodingType.RAW, Collections.emptyList(),
+ CompressionCodec.MV_ENTRY_DICT, null);
tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
TableConfigUtils.validate(tableConfig, schema);
- Assert.fail("Should fail since dictionary encoding does not support compression codec zstandard");
+ Assert.fail("Should fail since raw encoding does not support compression codec MV_ENTRY_DICT");
} catch (Exception e) {
- Assert.assertEquals(e.getMessage(), "Set compression codec to null for dictionary encoding type");
+ Assert.assertEquals(e.getMessage(), "Compression codec: MV_ENTRY_DICT is not applicable to raw index");
}
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
@@ -1227,7 +1232,7 @@ public class TableConfigUtilsTest {
Assert.fail("Should not be able to disable dictionary but keep inverted index");
} catch (Exception e) {
Assert.assertEquals(e.getMessage(),
- "Cannot create an Inverted index on column myCol2 specified in the " + "noDictionaryColumns config");
+ "Cannot create an Inverted index on column myCol2 specified in the noDictionaryColumns config");
}
// Tests the case when the field-config list marks a column as raw (non-dictionary) and enables
@@ -1242,7 +1247,7 @@ public class TableConfigUtilsTest {
Assert.fail("Should not be able to disable dictionary but keep inverted index");
} catch (Exception e) {
Assert.assertEquals(e.getMessage(),
- "Cannot create an Inverted Index on column: myCol2, specified as a non dictionary column");
+ "Cannot create inverted index on column: myCol2, it can only be applied to dictionary encoded columns");
}
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
@@ -1258,7 +1263,9 @@ public class TableConfigUtilsTest {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should not be able to disable dictionary but keep inverted index");
} catch (Exception e) {
- Assert.assertEquals(e.getMessage(), "FST Index is only enabled on dictionary encoded columns");
+ Assert.assertEquals(e.getMessage(),
+ "Cannot create FST index on column: myCol2, it can only be applied to dictionary encoded single value "
+ + "string columns");
}
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
@@ -1433,7 +1440,7 @@ public class TableConfigUtilsTest {
}
starTreeIndexConfig = new StarTreeIndexConfig(Arrays.asList("myCol"), null, null,
- Arrays.asList(new StarTreeAggregationConfig("myCol2", "SUM", FieldConfig.CompressionCodec.LZ4)), 1);
+ Arrays.asList(new StarTreeAggregationConfig("myCol2", "SUM", CompressionCodec.LZ4)), 1);
tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setStarTreeIndexConfigs(Arrays.asList(starTreeIndexConfig)).build();
try {
@@ -1798,8 +1805,7 @@ public class TableConfigUtilsTest {
try {
TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
} catch (IllegalStateException e) {
- Assert.assertEquals(e.getMessage(),
- "The outOfOrderRecordColumn must be a single-valued BOOLEAN column");
+ Assert.assertEquals(e.getMessage(), "The outOfOrderRecordColumn must be a single-valued BOOLEAN column");
}
}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/DictIdCompressionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/DictIdCompressionType.java
new file mode 100644
index 0000000000..82a60ed869
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/DictIdCompressionType.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.compression;
+
+/**
+ * Compression type for dictionary-encoded forward index, where the values stored are dictionary ids.
+ */
+public enum DictIdCompressionType {
+ // Add a second level dictionary encoding for the multi-value entries
+ MV_ENTRY_DICT(false, true);
+
+ private final boolean _applicableToSV;
+ private final boolean _applicableToMV;
+
+ DictIdCompressionType(boolean applicableToSV, boolean applicableToMV) {
+ _applicableToSV = applicableToSV;
+ _applicableToMV = applicableToMV;
+ }
+
+ public boolean isApplicableToSV() {
+ return _applicableToSV;
+ }
+
+ public boolean isApplicableToMV() {
+ return _applicableToMV;
+ }
+
+ public boolean isApplicable(boolean isSingleValue) {
+ return isSingleValue ? isApplicableToSV() : isApplicableToMV();
+ }
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
index 445dcc0b71..fcdbbe4fe0 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
@@ -21,19 +21,19 @@ package org.apache.pinot.segment.spi.index;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.DictIdCompressionType;
import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec;
import org.apache.pinot.spi.config.table.IndexConfig;
-import org.apache.pinot.spi.utils.JsonUtils;
public class ForwardIndexConfig extends IndexConfig {
public static final int DEFAULT_RAW_WRITER_VERSION = 2;
- public static final ForwardIndexConfig DISABLED = new ForwardIndexConfig(true, null, null, null);
+ public static final ForwardIndexConfig DISABLED = new ForwardIndexConfig(true, null, null, null, null);
public static final ForwardIndexConfig DEFAULT = new Builder().build();
@Nullable
@@ -41,15 +41,20 @@ public class ForwardIndexConfig extends IndexConfig {
private final boolean _deriveNumDocsPerChunk;
private final int _rawIndexWriterVersion;
+ @Nullable
+ private final DictIdCompressionType _dictIdCompressionType;
+
@JsonCreator
- public ForwardIndexConfig(@Nullable @JsonProperty("disabled") Boolean disabled,
- @Nullable @JsonProperty("chunkCompressionType") ChunkCompressionType chunkCompressionType,
+ public ForwardIndexConfig(@JsonProperty("disabled") @Nullable Boolean disabled,
+ @JsonProperty("chunkCompressionType") @Nullable ChunkCompressionType chunkCompressionType,
@JsonProperty("deriveNumDocsPerChunk") Boolean deriveNumDocsPerChunk,
- @JsonProperty("rawIndexWriterVersion") Integer rawIndexWriterVersion) {
+ @JsonProperty("rawIndexWriterVersion") Integer rawIndexWriterVersion,
+ @JsonProperty("dictIdCompressionType") @Nullable DictIdCompressionType dictIdCompressionType) {
super(disabled);
_chunkCompressionType = chunkCompressionType;
_deriveNumDocsPerChunk = deriveNumDocsPerChunk != null && deriveNumDocsPerChunk;
_rawIndexWriterVersion = rawIndexWriterVersion == null ? DEFAULT_RAW_WRITER_VERSION : rawIndexWriterVersion;
+ _dictIdCompressionType = dictIdCompressionType;
}
@Nullable
@@ -65,12 +70,17 @@ public class ForwardIndexConfig extends IndexConfig {
return _rawIndexWriterVersion;
}
+ @Nullable
+ public DictIdCompressionType getDictIdCompressionType() {
+ return _dictIdCompressionType;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
- if (o == null || getClass() != o.getClass()) {
+ if (!(o instanceof ForwardIndexConfig)) {
return false;
}
if (!super.equals(o)) {
@@ -78,12 +88,14 @@ public class ForwardIndexConfig extends IndexConfig {
}
ForwardIndexConfig that = (ForwardIndexConfig) o;
return _deriveNumDocsPerChunk == that._deriveNumDocsPerChunk
- && _rawIndexWriterVersion == that._rawIndexWriterVersion && _chunkCompressionType == that._chunkCompressionType;
+ && _rawIndexWriterVersion == that._rawIndexWriterVersion && _chunkCompressionType == that._chunkCompressionType
+ && Objects.equals(_dictIdCompressionType, that._dictIdCompressionType);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), _chunkCompressionType, _deriveNumDocsPerChunk, _rawIndexWriterVersion);
+ return Objects.hash(super.hashCode(), _chunkCompressionType, _deriveNumDocsPerChunk, _rawIndexWriterVersion,
+ _dictIdCompressionType);
}
public static class Builder {
@@ -92,6 +104,9 @@ public class ForwardIndexConfig extends IndexConfig {
private boolean _deriveNumDocsPerChunk = false;
private int _rawIndexWriterVersion = DEFAULT_RAW_WRITER_VERSION;
+ @Nullable
+ private DictIdCompressionType _dictIdCompressionType;
+
public Builder() {
}
@@ -99,6 +114,7 @@ public class ForwardIndexConfig extends IndexConfig {
_chunkCompressionType = other.getChunkCompressionType();
_deriveNumDocsPerChunk = other._deriveNumDocsPerChunk;
_rawIndexWriterVersion = other._rawIndexWriterVersion;
+ _dictIdCompressionType = other._dictIdCompressionType;
}
public Builder withCompressionType(ChunkCompressionType chunkCompressionType) {
@@ -116,6 +132,39 @@ public class ForwardIndexConfig extends IndexConfig {
return this;
}
+ public Builder withDictIdCompressionType(DictIdCompressionType dictIdCompressionType) {
+ _dictIdCompressionType = dictIdCompressionType;
+ return this;
+ }
+
+ public Builder withCompressionCodec(CompressionCodec compressionCodec) {
+ if (compressionCodec == null) {
+ _chunkCompressionType = null;
+ _dictIdCompressionType = null;
+ return this;
+ }
+ switch (compressionCodec) {
+ case PASS_THROUGH:
+ _chunkCompressionType = ChunkCompressionType.PASS_THROUGH;
+ break;
+ case SNAPPY:
+ _chunkCompressionType = ChunkCompressionType.SNAPPY;
+ break;
+ case ZSTANDARD:
+ _chunkCompressionType = ChunkCompressionType.ZSTANDARD;
+ break;
+ case LZ4:
+ _chunkCompressionType = ChunkCompressionType.LZ4;
+ break;
+ case MV_ENTRY_DICT:
+ _dictIdCompressionType = DictIdCompressionType.MV_ENTRY_DICT;
+ break;
+ default:
+ throw new IllegalStateException("Unsupported compression codec: " + compressionCodec);
+ }
+ return this;
+ }
+
public Builder withLegacyProperties(Map<String, Map<String, String>> propertiesByCol, String colName) {
if (propertiesByCol != null) {
Map<String, String> colProps = propertiesByCol.get(colName);
@@ -139,18 +188,8 @@ public class ForwardIndexConfig extends IndexConfig {
}
public ForwardIndexConfig build() {
- return new ForwardIndexConfig(false, _chunkCompressionType, _deriveNumDocsPerChunk, _rawIndexWriterVersion);
- }
- }
-
- @Override
- public String toString() {
- try {
- return JsonUtils.objectToString(this);
- } catch (IOException ex) {
- return "{" + "\"chunkCompressionType\":" + _chunkCompressionType
- + ", \"deriveNumDocsPerChunk\":" + _deriveNumDocsPerChunk
- + ", \"rawIndexWriterVersion\":" + _rawIndexWriterVersion + '}';
+ return new ForwardIndexConfig(false, _chunkCompressionType, _deriveNumDocsPerChunk, _rawIndexWriterVersion,
+ _dictIdCompressionType);
}
}
}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
index 5882986edd..e067376472 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
@@ -25,6 +25,7 @@ import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.DictIdCompressionType;
import org.apache.pinot.segment.spi.index.IndexReader;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.utils.BigDecimalUtils;
@@ -57,12 +58,20 @@ public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends
/**
* Returns the compression type (if valid). Only valid for RAW forward index columns implemented in
* BaseChunkForwardIndexReader.
- * @return
*/
+ @Nullable
default ChunkCompressionType getCompressionType() {
return null;
}
+ /**
+ * Returns the compression type for dictionary encoded forward index.
+ */
+ @Nullable
+ default DictIdCompressionType getDictIdCompressionType() {
+ return null;
+ }
+
/**
* Returns the length of the longest entry. Only valid for RAW forward index columns implemented in
* BaseChunkForwardIndexReader. Returns -1 otherwise.
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
index 30a42189a4..47a846d878 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
@@ -119,7 +119,29 @@ public class FieldConfig extends BaseJsonConfig {
}
public enum CompressionCodec {
- PASS_THROUGH, SNAPPY, ZSTANDARD, LZ4
+ PASS_THROUGH(true, false),
+ SNAPPY(true, false),
+ ZSTANDARD(true, false),
+ LZ4(true, false),
+
+ // For MV dictionary encoded forward index, add a second level dictionary encoding for the multi-value entries
+ MV_ENTRY_DICT(false, true);
+
+ private final boolean _applicableToRawIndex;
+ private final boolean _applicableToDictEncodedIndex;
+
+ CompressionCodec(boolean applicableToRawIndex, boolean applicableToDictEncodedIndex) {
+ _applicableToRawIndex = applicableToRawIndex;
+ _applicableToDictEncodedIndex = applicableToDictEncodedIndex;
+ }
+
+ public boolean isApplicableToRawIndex() {
+ return _applicableToRawIndex;
+ }
+
+ public boolean isApplicableToDictEncodedIndex() {
+ return _applicableToDictEncodedIndex;
+ }
}
public String getName() {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org