You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ma...@apache.org on 2019/07/06 03:41:10 UTC
[incubator-pinot] branch master updated: #4317 Feature/variable
length bytes offline dictionary for indexing bytes and string dicts.
(#4321)
This is an automated email from the ASF dual-hosted git repository.
mayanks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 49d8fa7 #4317 Feature/variable length bytes offline dictionary for indexing bytes and string dicts. (#4321)
49d8fa7 is described below
commit 49d8fa7907efc4192c3ecd5fef67c33e657c39f5
Author: Buchi Reddy Busi Reddy <ma...@gmail.com>
AuthorDate: Fri Jul 5 20:41:04 2019 -0700
#4317 Feature/variable length bytes offline dictionary for indexing bytes and string dicts. (#4321)
* Adding a variable length bytes value reader for offline dictionary index.
* Make variables private.
* Cleaned up the changes a bit and removed the hacks to use the
variable length byte dictionary.
In the next iteration, we'll be deciding dynamically whether to use
fixed or variable length impl.
* Minor cleanup.
* Adding stats about the string values length in the StringDictionaryPerfTest.java
* Improved the magic header for VariableLength dictionary.
Also fixed other review comments from Kishore.
* Making the feature of whether to use variable length dictionary
for a column configurable through TableConfig.
* Use the variable length dictionary config in realtime segments path too.
* Fixed a bug that was caught by the unit tests.
* Adding variable length dictionary config in more places like:
- IndexLoadingConfig
- RealtimeSegmentConfig
* Use separate file names for different tests in a single class. This was
causing a race condition and unit test failure.
* Dont set null value for varLengthDictionaryColumns
* Fixed review comments.
* Fixed review comments and formatted the code as per Pinot code style.
* Variable length dictionary shouldn't support getPaddedString()
since we don't expect it to be ever called.
* The way byte[] returned from ImmutableDictionaryReader is reused,
we can't really reuse it so removing the ThreadLocal byte[].
Thanks to the unit test which caught this issue :thumbsup:
* Explicitly persist the data section offset in the header of the store
so that we don't need to make any assumptions about where the data starts.
---
.../apache/pinot/common/config/IndexingConfig.java | 20 +-
.../pinot/common/config/IndexingConfigTest.java | 3 +
.../realtime/HLRealtimeSegmentDataManager.java | 5 +
.../realtime/LLRealtimeSegmentDataManager.java | 6 +-
.../generator/SegmentGeneratorConfig.java | 13 ++
.../org/apache/pinot/core/io/util/ValueReader.java | 3 +-
.../io/util/VarLengthBytesValueReaderWriter.java | 240 +++++++++++++++++++++
.../converter/RealtimeSegmentConverter.java | 11 +-
.../core/realtime/impl/RealtimeSegmentConfig.java | 20 +-
.../segment/creator/ColumnIndexCreationInfo.java | 10 +-
.../creator/impl/SegmentColumnarIndexCreator.java | 3 +-
.../creator/impl/SegmentDictionaryCreator.java | 72 +++++--
.../impl/SegmentIndexCreationDriverImpl.java | 6 +-
.../segment/index/loader/IndexLoadingConfig.java | 11 +
.../defaultcolumn/BaseDefaultColumnHandler.java | 8 +-
.../index/readers/ImmutableDictionaryReader.java | 18 +-
.../segment/store/SegmentLocalFSDirectory.java | 2 +-
.../MutableSegmentImplAggregateMetricsTest.java | 1 +
.../mutable/MutableSegmentImplTest.java | 3 +-
.../mutable/MutableSegmentImplTestUtils.java | 4 +-
.../util/VarLengthBytesValueReaderWriterTest.java | 118 ++++++++++
.../pinot/perf/BenchmarkDictionaryCreation.java | 12 ++
.../pinot/perf/StringDictionaryPerfTest.java | 86 ++++++--
.../realtime/provisioning/MemoryEstimator.java | 18 +-
24 files changed, 626 insertions(+), 67 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/config/IndexingConfig.java b/pinot-common/src/main/java/org/apache/pinot/common/config/IndexingConfig.java
index 6c0fb65..6172f52 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/config/IndexingConfig.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/config/IndexingConfig.java
@@ -87,6 +87,14 @@ public class IndexingConfig {
@ConfigKey("aggregateMetrics")
private boolean _aggregateMetrics;
+ /**
+ * The list of columns for which the variable length dictionary needs to be enabled in offline
+ * segments. This is only valid for string and bytes columns and has no impact for columns of
+ * other data types.
+ */
+ @ConfigKey("varLengthDictionaryColumns")
+ private List<String> _varLengthDictionaryColumns;
+
public List<String> getInvertedIndexColumns() {
return _invertedIndexColumns;
}
@@ -223,6 +231,14 @@ public class IndexingConfig {
return _aggregateMetrics;
}
+ public List<String> getVarLengthDictionaryColumns() {
+ return _varLengthDictionaryColumns;
+ }
+
+ public void setVarLengthDictionaryColumns(List<String> varLengthDictionaryColumns) {
+ _varLengthDictionaryColumns = varLengthDictionaryColumns;
+ }
+
@Override
public String toString() {
final StringBuilder result = new StringBuilder();
@@ -279,7 +295,8 @@ public class IndexingConfig {
.isEqual(_onHeapDictionaryColumns, that._onHeapDictionaryColumns) && EqualityUtils
.isEqual(_starTreeIndexSpec, that._starTreeIndexSpec) && EqualityUtils
.isEqual(_segmentPartitionConfig, that._segmentPartitionConfig) && EqualityUtils
- .isEqual(_bloomFilterColumns, that._bloomFilterColumns);
+ .isEqual(_bloomFilterColumns, that._bloomFilterColumns) && EqualityUtils
+ .isEqual(_varLengthDictionaryColumns, that._varLengthDictionaryColumns);
}
@Override
@@ -298,6 +315,7 @@ public class IndexingConfig {
result = EqualityUtils.hashCodeOf(result, _starTreeIndexSpec);
result = EqualityUtils.hashCodeOf(result, _segmentPartitionConfig);
result = EqualityUtils.hashCodeOf(result, _bloomFilterColumns);
+ result = EqualityUtils.hashCodeOf(result, _varLengthDictionaryColumns);
return result;
}
}
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/config/IndexingConfigTest.java b/pinot-common/src/test/java/org/apache/pinot/common/config/IndexingConfigTest.java
index e3d31d5..fd782fe 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/config/IndexingConfigTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/config/IndexingConfigTest.java
@@ -54,6 +54,8 @@ public class IndexingConfigTest {
noDictionaryConfig.put("a", "SNAPPY");
noDictionaryConfig.put("b", "PASS_THROUGH");
indexingConfig.setnoDictionaryConfig(noDictionaryConfig);
+ List<String> varLengthDictionaryColumns = Arrays.asList("a", "x", "z");
+ indexingConfig.setVarLengthDictionaryColumns(varLengthDictionaryColumns);
indexingConfig = JsonUtils.stringToObject(JsonUtils.objectToString(indexingConfig), IndexingConfig.class);
@@ -64,6 +66,7 @@ public class IndexingConfigTest {
assertEquals(indexingConfig.getOnHeapDictionaryColumns(), onHeapDictionaryColumns);
assertEquals(indexingConfig.getBloomFilterColumns(), bloomFilterColumns);
assertEquals(indexingConfig.getNoDictionaryConfig(), noDictionaryConfig);
+ assertEquals(indexingConfig.getVarLengthDictionaryColumns(), varLengthDictionaryColumns);
}
@Test
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index b8b7f17..d5b223a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -92,6 +92,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
private final String sortedColumn;
private final List<String> invertedIndexColumns;
private final List<String> noDictionaryColumns;
+ private final List<String> varLengthDictionaryColumns;
private Logger segmentLogger = LOGGER;
private final SegmentVersion _segmentVersion;
@@ -145,6 +146,8 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// No DictionaryColumns
noDictionaryColumns = new ArrayList<>(indexLoadingConfig.getNoDictionaryColumns());
+ varLengthDictionaryColumns = new ArrayList<>(indexLoadingConfig.getVarLengthDictionaryColumns());
+
_streamConfig = new StreamConfig(tableConfig.getIndexingConfig().getStreamConfigs());
segmentLogger = LoggerFactory.getLogger(
@@ -181,6 +184,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
.setSchema(schema).setCapacity(capacity)
.setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
.setNoDictionaryColumns(indexLoadingConfig.getNoDictionaryColumns())
+ .setVarLengthDictionaryColumns(indexLoadingConfig.getVarLengthDictionaryColumns())
.setInvertedIndexColumns(invertedIndexColumns).setRealtimeSegmentZKMetadata(realtimeSegmentZKMetadata)
.setOffHeap(indexLoadingConfig.isRealtimeOffheapAllocation()).setMemoryManager(
getMemoryManager(realtimeTableDataManager.getConsumerDir(), segmentName,
@@ -259,6 +263,7 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
new RealtimeSegmentConverter(realtimeSegment, tempSegmentFolder.getAbsolutePath(), schema,
tableNameWithType, timeColumnName, realtimeSegmentZKMetadata.getSegmentName(),
sortedColumn, HLRealtimeSegmentDataManager.this.invertedIndexColumns, noDictionaryColumns,
+ varLengthDictionaryColumns,
null/*StarTreeIndexSpec*/); // Star tree not supported for HLC.
segmentLogger.info("Trying to build segment");
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 61ed73c..e2cb15c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -237,6 +237,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
private final String _timeColumnName;
private final List<String> _invertedIndexColumns;
private final List<String> _noDictionaryColumns;
+ private final List<String> _varLengthDictionaryColumns;
private final StarTreeIndexSpec _starTreeIndexSpec;
private final String _sortedColumn;
private Logger segmentLogger;
@@ -662,7 +663,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
RealtimeSegmentConverter converter =
new RealtimeSegmentConverter(_realtimeSegment, tempSegmentFolder.getAbsolutePath(), _schema,
_tableNameWithType, _timeColumnName, _segmentZKMetadata.getSegmentName(), _sortedColumn,
- _invertedIndexColumns, _noDictionaryColumns, _starTreeIndexSpec);
+ _invertedIndexColumns, _noDictionaryColumns, _varLengthDictionaryColumns, _starTreeIndexSpec);
segmentLogger.info("Trying to build segment");
try {
converter.build(_segmentVersion, _serverMetrics);
@@ -1088,6 +1089,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// No dictionary Columns
_noDictionaryColumns = new ArrayList<>(indexLoadingConfig.getNoDictionaryColumns());
+ _varLengthDictionaryColumns = new ArrayList<>(indexLoadingConfig.getVarLengthDictionaryColumns());
+
// Read the star tree config
_starTreeIndexSpec = indexingConfig.getStarTreeIndexSpec();
@@ -1106,6 +1109,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
.setSchema(schema).setCapacity(_segmentMaxRowCount)
.setAvgNumMultiValues(indexLoadingConfig.getRealtimeAvgMultiValueCount())
.setNoDictionaryColumns(indexLoadingConfig.getNoDictionaryColumns())
+ .setVarLengthDictionaryColumns(indexLoadingConfig.getVarLengthDictionaryColumns())
.setInvertedIndexColumns(invertedIndexColumns).setRealtimeSegmentZKMetadata(segmentZKMetadata)
.setOffHeap(_isOffHeap).setMemoryManager(_memoryManager)
.setStatsHistory(realtimeTableDataManager.getStatsHistory())
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
index bc176bd..4b4251c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
@@ -80,6 +80,7 @@ public class SegmentGeneratorConfig {
private Map<String, ChunkCompressorFactory.CompressionType> _rawIndexCompressionType = new HashMap<>();
private List<String> _invertedIndexCreationColumns = new ArrayList<>();
private List<String> _columnSortOrder = new ArrayList<>();
+ private List<String> _varLengthDictionaryColumns = new ArrayList<>();
private String _dataDir = null;
private String _inputFilePath = null;
private FileFormat _format = FileFormat.AVRO;
@@ -128,6 +129,7 @@ public class SegmentGeneratorConfig {
_rawIndexCompressionType.putAll(config._rawIndexCompressionType);
_invertedIndexCreationColumns.addAll(config._invertedIndexCreationColumns);
_columnSortOrder.addAll(config._columnSortOrder);
+ _varLengthDictionaryColumns.addAll(config._varLengthDictionaryColumns);
_dataDir = config._dataDir;
_inputFilePath = config._inputFilePath;
_format = config._format;
@@ -187,6 +189,9 @@ public class SegmentGeneratorConfig {
this.setRawIndexCompressionType(serializedNoDictionaryColumnMap);
}
}
+ if (indexingConfig.getVarLengthDictionaryColumns() != null) {
+ setVarLengthDictionaryColumns(indexingConfig.getVarLengthDictionaryColumns());
+ }
_segmentPartitionConfig = indexingConfig.getSegmentPartitionConfig();
// Star-tree V1 config
@@ -276,6 +281,14 @@ public class SegmentGeneratorConfig {
_columnSortOrder.addAll(sortOrder);
}
+ public List<String> getVarLengthDictionaryColumns() {
+ return _varLengthDictionaryColumns;
+ }
+
+ public void setVarLengthDictionaryColumns(List<String> varLengthDictionaryColumns) {
+ this._varLengthDictionaryColumns = varLengthDictionaryColumns;
+ }
+
public void createInvertedIndexForColumn(String column) {
Preconditions.checkNotNull(column);
if (_schema != null && _schema.getFieldSpecFor(column) == null) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/io/util/ValueReader.java b/pinot-core/src/main/java/org/apache/pinot/core/io/util/ValueReader.java
index 644c06b..46340c4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/io/util/ValueReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/io/util/ValueReader.java
@@ -39,6 +39,5 @@ public interface ValueReader {
byte[] getBytes(int index, int numBytesPerValue, byte[] buffer);
- void close()
- throws IOException;
+ void close() throws IOException;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriter.java b/pinot-core/src/main/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriter.java
new file mode 100644
index 0000000..45e840b
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriter.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.io.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+
+
+/**
+ * An immutable implementation of {@link ValueReader} that will allow each byte[] to be of variable
+ * length and there by avoiding the unnecessary padding. Since this is an immutable data structure,
+ * full data has to be given at initialization time an doesn't have any write() methods.
+ *
+ * The layout of the file is as follows:
+ * <p> Header Section: </p>
+ * <ul>
+ * <li> Magic bytes: Chose this to be ".vl;" to avoid conflicts with the fixed size
+ * {@link ValueReader} implementations. By having special characters, this avoids conflicts
+ * with regular bytes/strings dictionaries.
+ * </li>
+ * <li> Version number: This is an integer and can be used for the evolution of the store
+ * implementation by incrementing version for every incompatible change to the store/format.
+ * </li>
+ * <li> Number of elements in the store. </li>
+ * <li> The offset where the data section starts. Though the data section usually starts right after
+ * the header, having this explicitly will let the store to be evolved freely without any
+ * assumptions about where the data section starts.
+ * </li>
+ * </ul>
+ *
+ * <p> Data section: </p>
+ * <ul>
+ * <li> Offsets Array: Integer offsets to start position of byte arrays.
+ * Example: [O(1), O(2),...O(n)] where O is the Offset function. Length of nth element
+ * is computed as: O(n+1) - O(n). Since the last element's length can't be computed
+ * using this formula, we store an extra offset at the end to be able to compute last
+ * element's length with the same formula, without depending on underlying buffer's size.
+ * </li>
+ * <li> All byte arrays or values. </li>
+ * </ul>
+ *
+ * @see FixedByteValueReaderWriter
+ */
+public class VarLengthBytesValueReaderWriter implements Closeable, ValueReader {
+
+ /**
+ * Magic bytes used to identify the dictionary files written in variable length bytes format.
+ */
+ private static final byte[] MAGIC_BYTES = StringUtil.encodeUtf8(".vl;");
+
+ /**
+ * Increment this version if there are any structural changes in the store format and
+ * deal with backward compatibility correctly based on old versions.
+ */
+ private static final int VERSION = 1;
+
+ // Offsets of different fields in the header. Having as constants for readability.
+ private static final int VERSION_OFFSET = MAGIC_BYTES.length;
+ private static final int NUM_ELEMENTS_OFFSET = VERSION_OFFSET + Integer.BYTES;
+ private static final int DATA_SECTION_OFFSET_POSITION = NUM_ELEMENTS_OFFSET + Integer.BYTES;
+ private static final int HEADER_LENGTH = DATA_SECTION_OFFSET_POSITION + Integer.BYTES;
+
+ private final PinotDataBuffer _dataBuffer;
+
+ /**
+ * The offset of the data section in the buffer/store. This info will be persisted in the header
+ * so it has to be read from the buffer while initializing the store in read cases.
+ */
+ private final int _dataSectionStartOffSet;
+
+ /**
+ * Total number of values present in the store.
+ */
+ private final int _numElements;
+
+ /**
+ * Constructor to create a VarLengthBytesValueReaderWriter from a previously written buffer.
+ */
+ public VarLengthBytesValueReaderWriter(PinotDataBuffer dataBuffer) {
+ _dataBuffer = dataBuffer;
+
+ // To prepare ourselves to start reading the data, initialize the data offset.
+ _numElements = dataBuffer.getInt(NUM_ELEMENTS_OFFSET);
+ _dataSectionStartOffSet = dataBuffer.getInt(DATA_SECTION_OFFSET_POSITION);
+ }
+
+ /**
+ * Constructor to create a new immutable store with the given data.
+ */
+ public VarLengthBytesValueReaderWriter(PinotDataBuffer dataBuffer, byte[][] byteArrays) {
+ _dataBuffer = dataBuffer;
+ _numElements = byteArrays.length;
+
+ // For now, start writing the data section right after the header but if this store evolves,
+ // we could decide to start the data section somewhere else.
+ _dataSectionStartOffSet = HEADER_LENGTH;
+
+ write(byteArrays);
+ }
+
+ public static long getRequiredSize(byte[][] byteArrays) {
+ // First include the header and then the data section.
+ // Remember there are n+1 offsets
+ long length = HEADER_LENGTH + Integer.BYTES * (byteArrays.length + 1);
+
+ for (byte[] array : byteArrays) {
+ length += array.length;
+ }
+ return length;
+ }
+
+ public static boolean isVarLengthBytesDictBuffer(PinotDataBuffer buffer) {
+ // If the buffer is smaller than header size, it's not var length dictionary.
+ if (buffer.size() > HEADER_LENGTH) {
+ byte[] magicBytes = new byte[MAGIC_BYTES.length];
+ buffer.copyTo(0, magicBytes, 0, MAGIC_BYTES.length);
+
+ if (Arrays.equals(MAGIC_BYTES, magicBytes)) {
+ // Verify the version.
+ if (VERSION == buffer.getInt(MAGIC_BYTES.length)) {
+ // Also verify that there is a valid numElements value and valid offset for data section.
+ return buffer.getInt(NUM_ELEMENTS_OFFSET) >= 0 && buffer.getInt(DATA_SECTION_OFFSET_POSITION) > 0;
+ }
+ }
+ }
+
+ return false;
+ }
+
+ private void writeHeader() {
+ for (int offset = 0; offset < MAGIC_BYTES.length; offset++) {
+ _dataBuffer.putByte(offset, MAGIC_BYTES[offset]);
+ }
+ _dataBuffer.putInt(VERSION_OFFSET, VERSION);
+ _dataBuffer.putInt(NUM_ELEMENTS_OFFSET, _numElements);
+ _dataBuffer.putInt(DATA_SECTION_OFFSET_POSITION, _dataSectionStartOffSet);
+ }
+
+ private void write(byte[][] byteArrays) {
+ writeHeader();
+
+ // Then write the offset of each of the byte array in the data buffer.
+ int nextOffset = _dataSectionStartOffSet;
+ int nextArrayStartOffset = _dataSectionStartOffSet + Integer.BYTES * (byteArrays.length + 1);
+ for (byte[] array : byteArrays) {
+ _dataBuffer.putInt(nextOffset, nextArrayStartOffset);
+ nextOffset += Integer.BYTES;
+ nextArrayStartOffset += array.length;
+ }
+
+ // Write the additional offset to easily get the length of last array.
+ _dataBuffer.putInt(nextOffset, nextArrayStartOffset);
+
+ // Finally write the byte arrays.
+ nextArrayStartOffset = _dataSectionStartOffSet + Integer.BYTES * (byteArrays.length + 1);
+ for (byte[] array : byteArrays) {
+ _dataBuffer.readFrom(nextArrayStartOffset, array);
+ nextArrayStartOffset += array.length;
+ }
+ }
+
+ public int getNumElements() {
+ return _numElements;
+ }
+
+ @Override
+ public int getInt(int index) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getLong(int index) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public float getFloat(int index) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public double getDouble(int index) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getUnpaddedString(int index, int numBytesPerValue, byte paddingByte, byte[] buffer) {
+ return StringUtil.decodeUtf8(getBytes(index, numBytesPerValue, buffer));
+ }
+
+ @Override
+ public String getPaddedString(int index, int numBytesPerValue, byte[] buffer) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public byte[] getBytes(int index, int numBytesPerValue, byte[] buffer) {
+ // Read the offset of the byte array first and then read the actual byte array.
+ int offset = _dataBuffer.getInt(_dataSectionStartOffSet + Integer.BYTES * index);
+
+ // To get the length of the byte array, we use the next byte array offset.
+ int length = _dataBuffer.getInt(_dataSectionStartOffSet + Integer.BYTES * (index + 1)) - offset;
+
+ byte[] b;
+ // If the caller didn't pass a buffer, create one with exact length.
+ if (buffer == null) {
+ b = new byte[length];
+ } else {
+ // If the buffer passed by the caller isn't big enough, create a new one.
+ b = buffer.length == length ? buffer : new byte[length];
+ }
+ _dataBuffer.copyTo(offset, b, 0, length);
+ return b;
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ _dataBuffer.close();
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java
index 40be8e7..d833603 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java
@@ -54,10 +54,12 @@ public class RealtimeSegmentConverter {
private List<String> invertedIndexColumns;
private List<String> noDictionaryColumns;
private StarTreeIndexSpec starTreeIndexSpec;
+ private List<String> varLengthDictionaryColumns;
public RealtimeSegmentConverter(MutableSegmentImpl realtimeSegment, String outputPath, Schema schema,
String tableName, String timeColumnName, String segmentName, String sortedColumn,
- List<String> invertedIndexColumns, List<String> noDictionaryColumns, StarTreeIndexSpec starTreeIndexSpec) {
+ List<String> invertedIndexColumns, List<String> noDictionaryColumns,
+ List<String> varLengthDictionaryColumns, StarTreeIndexSpec starTreeIndexSpec) {
if (new File(outputPath).exists()) {
throw new IllegalAccessError("path already exists:" + outputPath);
}
@@ -73,13 +75,14 @@ public class RealtimeSegmentConverter {
this.tableName = tableName;
this.segmentName = segmentName;
this.noDictionaryColumns = noDictionaryColumns;
+ this.varLengthDictionaryColumns = varLengthDictionaryColumns;
this.starTreeIndexSpec = starTreeIndexSpec;
}
public RealtimeSegmentConverter(MutableSegmentImpl realtimeSegment, String outputPath, Schema schema,
String tableName, String timeColumnName, String segmentName, String sortedColumn) {
this(realtimeSegment, outputPath, schema, tableName, timeColumnName, segmentName, sortedColumn, new ArrayList<>(),
- new ArrayList<>(), null/*StarTreeIndexSpec*/);
+ new ArrayList<>(), new ArrayList<>(), null/*StarTreeIndexSpec*/);
}
public void build(@Nullable SegmentVersion segmentVersion, ServerMetrics serverMetrics)
@@ -109,6 +112,10 @@ public class RealtimeSegmentConverter {
genConfig.setRawIndexCompressionType(columnToCompressionType);
}
+ if (varLengthDictionaryColumns != null) {
+ genConfig.setVarLengthDictionaryColumns(varLengthDictionaryColumns);
+ }
+
// Presence of the spec enables star tree generation.
if (starTreeIndexSpec != null) {
genConfig.enableStarTreeIndex(starTreeIndexSpec);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java
index 63ebbbb..6d18265 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java
@@ -32,6 +32,7 @@ public class RealtimeSegmentConfig {
private final int _capacity;
private final int _avgNumMultiValues;
private final Set<String> _noDictionaryColumns;
+ private final Set<String> _varLengthDictionaryColumns;
private final Set<String> _invertedIndexColumns;
private final RealtimeSegmentZKMetadata _realtimeSegmentZKMetadata;
private final boolean _offHeap;
@@ -41,8 +42,9 @@ public class RealtimeSegmentConfig {
private final boolean _aggregateMetrics;
private RealtimeSegmentConfig(String segmentName, String streamName, Schema schema, int capacity,
- int avgNumMultiValues, Set<String> noDictionaryColumns, Set<String> invertedIndexColumns,
- RealtimeSegmentZKMetadata realtimeSegmentZKMetadata, boolean offHeap, PinotDataBufferMemoryManager memoryManager,
+ int avgNumMultiValues, Set<String> noDictionaryColumns, Set<String> varLengthDictionaryColumns,
+ Set<String> invertedIndexColumns, RealtimeSegmentZKMetadata realtimeSegmentZKMetadata,
+ boolean offHeap, PinotDataBufferMemoryManager memoryManager,
RealtimeSegmentStatsHistory statsHistory, SegmentPartitionConfig segmentPartitionConfig,
boolean aggregateMetrics) {
_segmentName = segmentName;
@@ -51,6 +53,7 @@ public class RealtimeSegmentConfig {
_capacity = capacity;
_avgNumMultiValues = avgNumMultiValues;
_noDictionaryColumns = noDictionaryColumns;
+ _varLengthDictionaryColumns = varLengthDictionaryColumns;
_invertedIndexColumns = invertedIndexColumns;
_realtimeSegmentZKMetadata = realtimeSegmentZKMetadata;
_offHeap = offHeap;
@@ -84,6 +87,10 @@ public class RealtimeSegmentConfig {
return _noDictionaryColumns;
}
+ public Set<String> getVarLengthDictionaryColumns() {
+ return _varLengthDictionaryColumns;
+ }
+
public Set<String> getInvertedIndexColumns() {
return _invertedIndexColumns;
}
@@ -119,6 +126,7 @@ public class RealtimeSegmentConfig {
private int _capacity;
private int _avgNumMultiValues;
private Set<String> _noDictionaryColumns;
+ private Set<String> _varLengthDictionaryColumns;
private Set<String> _invertedIndexColumns;
private RealtimeSegmentZKMetadata _realtimeSegmentZKMetadata;
private boolean _offHeap;
@@ -160,6 +168,11 @@ public class RealtimeSegmentConfig {
return this;
}
+ public Builder setVarLengthDictionaryColumns(Set<String> varLengthDictionaryColumns) {
+ _varLengthDictionaryColumns = varLengthDictionaryColumns;
+ return this;
+ }
+
public Builder setInvertedIndexColumns(Set<String> invertedIndexColumns) {
_invertedIndexColumns = invertedIndexColumns;
return this;
@@ -197,7 +210,8 @@ public class RealtimeSegmentConfig {
public RealtimeSegmentConfig build() {
return new RealtimeSegmentConfig(_segmentName, _streamName, _schema, _capacity, _avgNumMultiValues,
- _noDictionaryColumns, _invertedIndexColumns, _realtimeSegmentZKMetadata, _offHeap, _memoryManager,
+ _noDictionaryColumns, _varLengthDictionaryColumns, _invertedIndexColumns,
+ _realtimeSegmentZKMetadata, _offHeap, _memoryManager,
_statsHistory, _segmentPartitionConfig, _aggregateMetrics);
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnIndexCreationInfo.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnIndexCreationInfo.java
index 84f74d1..e435188 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnIndexCreationInfo.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnIndexCreationInfo.java
@@ -26,6 +26,7 @@ import org.apache.pinot.core.data.partition.PartitionFunction;
public class ColumnIndexCreationInfo {
private final boolean createDictionary;
+ private final boolean useVarLengthDictionary;
private final ForwardIndexType forwardIndexType;
private final InvertedIndexType invertedIndexType;
private final boolean isAutoGenerated;
@@ -33,10 +34,11 @@ public class ColumnIndexCreationInfo {
private final ColumnStatistics columnStatistics;
public ColumnIndexCreationInfo(ColumnStatistics columnStatistics, boolean createDictionary,
- ForwardIndexType forwardIndexType, InvertedIndexType invertedIndexType, boolean isAutoGenerated,
- Object defaultNullValue) {
+ boolean useVarLengthDictionary, ForwardIndexType forwardIndexType,
+ InvertedIndexType invertedIndexType, boolean isAutoGenerated, Object defaultNullValue) {
this.columnStatistics = columnStatistics;
this.createDictionary = createDictionary;
+ this.useVarLengthDictionary = useVarLengthDictionary;
this.forwardIndexType = forwardIndexType;
this.invertedIndexType = invertedIndexType;
this.isAutoGenerated = isAutoGenerated;
@@ -47,6 +49,10 @@ public class ColumnIndexCreationInfo {
return createDictionary;
}
+ public boolean isUseVarLengthDictionary() {
+ return useVarLengthDictionary;
+ }
+
public Object getMin() {
return columnStatistics.getMinValue();
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
index 2016776..7b08a48 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -129,7 +129,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
// Initialize dictionary creator
SegmentDictionaryCreator dictionaryCreator =
- new SegmentDictionaryCreator(indexCreationInfo.getSortedUniqueElementsArray(), fieldSpec, _indexDir);
+ new SegmentDictionaryCreator(indexCreationInfo.getSortedUniqueElementsArray(),
+ fieldSpec, _indexDir, indexCreationInfo.isUseVarLengthDictionary());
_dictionaryCreatorMap.put(columnName, dictionaryCreator);
// Create dictionary
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentDictionaryCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentDictionaryCreator.java
index f9243ef..105c5fa 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentDictionaryCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentDictionaryCreator.java
@@ -34,6 +34,7 @@ import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.common.utils.primitive.ByteArray;
import org.apache.pinot.core.io.util.FixedByteValueReaderWriter;
+import org.apache.pinot.core.io.util.VarLengthBytesValueReaderWriter;
import org.apache.pinot.core.segment.memory.PinotDataBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +46,7 @@ public class SegmentDictionaryCreator implements Closeable {
private final Object _sortedValues;
private final FieldSpec _fieldSpec;
private final File _dictionaryFile;
+ private final boolean _useVarLengthDictionary;
private Int2IntOpenHashMap _intValueToIndexMap;
private Long2IntOpenHashMap _longValueToIndexMap;
@@ -54,12 +56,18 @@ public class SegmentDictionaryCreator implements Closeable {
private Object2IntOpenHashMap<ByteArray> _bytesValueToIndexMap;
private int _numBytesPerEntry = 0;
- public SegmentDictionaryCreator(Object sortedValues, FieldSpec fieldSpec, File indexDir)
- throws IOException {
+ public SegmentDictionaryCreator(Object sortedValues, FieldSpec fieldSpec, File indexDir,
+ boolean useVarLengthDictionary) throws IOException {
_sortedValues = sortedValues;
_fieldSpec = fieldSpec;
_dictionaryFile = new File(indexDir, fieldSpec.getName() + V1Constants.Dict.FILE_EXTENSION);
FileUtils.touch(_dictionaryFile);
+ _useVarLengthDictionary = useVarLengthDictionary;
+ }
+
+ public SegmentDictionaryCreator(Object sortedValues, FieldSpec fieldSpec, File indexDir)
+ throws IOException {
+ this(sortedValues, fieldSpec, indexDir, false);
}
public void build()
@@ -163,16 +171,7 @@ public class SegmentDictionaryCreator implements Closeable {
_numBytesPerEntry = Math.max(_numBytesPerEntry, valueBytes.length);
}
- // Backward-compatible: index file is always big-endian
- try (PinotDataBuffer dataBuffer = PinotDataBuffer
- .mapFile(_dictionaryFile, false, 0, (long) numValues * _numBytesPerEntry, ByteOrder.BIG_ENDIAN,
- getClass().getSimpleName());
- FixedByteValueReaderWriter writer = new FixedByteValueReaderWriter(dataBuffer)) {
- for (int i = 0; i < numValues; i++) {
- byte[] value = sortedStringBytes[i];
- writer.writeUnpaddedString(i, _numBytesPerEntry, value);
- }
- }
+ writeBytesValueDictionary(sortedStringBytes);
LOGGER.info(
"Created dictionary for STRING column: {} with cardinality: {}, max length in bytes: {}, range: {} to {}",
_fieldSpec.getName(), numValues, _numBytesPerEntry, sortedStrings[0], sortedStrings[numValues - 1]);
@@ -185,22 +184,15 @@ public class SegmentDictionaryCreator implements Closeable {
Preconditions.checkState(numValues > 0);
_bytesValueToIndexMap = new Object2IntOpenHashMap<>(numValues);
+ byte[][] sortedByteArrays = new byte[sortedBytes.length][];
for (int i = 0; i < numValues; i++) {
ByteArray value = sortedBytes[i];
+ sortedByteArrays[i] = value.getBytes();
_bytesValueToIndexMap.put(value, i);
_numBytesPerEntry = Math.max(_numBytesPerEntry, value.getBytes().length);
}
- // Backward-compatible: index file is always big-endian
- try (PinotDataBuffer dataBuffer = PinotDataBuffer
- .mapFile(_dictionaryFile, false, 0, (long) numValues * _numBytesPerEntry, ByteOrder.BIG_ENDIAN,
- getClass().getSimpleName());
- FixedByteValueReaderWriter writer = new FixedByteValueReaderWriter(dataBuffer)) {
- for (int i = 0; i < numValues; i++) {
- byte[] value = sortedBytes[i].getBytes();
- writer.writeUnpaddedString(i, _numBytesPerEntry, value);
- }
- }
+ writeBytesValueDictionary(sortedByteArrays);
LOGGER.info(
"Created dictionary for BYTES column: {} with cardinality: {}, max length in bytes: {}, range: {} to {}",
_fieldSpec.getName(), numValues, _numBytesPerEntry, sortedBytes[0], sortedBytes[numValues - 1]);
@@ -211,6 +203,42 @@ public class SegmentDictionaryCreator implements Closeable {
}
}
+ /**
+ * Helper method to write the given sorted byte[][] to an immutable bytes value dictionary.
+ * The dictionary implementation is chosen based on configuration at column level.
+ *
+ * @param sortedByteArrays The actual sorted byte arrays to be written to the store.
+ */
+ private void writeBytesValueDictionary(byte[][] sortedByteArrays)
+ throws IOException {
+
+ if (_useVarLengthDictionary) {
+ // Backward-compatible: index file is always big-endian
+ long size = VarLengthBytesValueReaderWriter.getRequiredSize(sortedByteArrays);
+ try (PinotDataBuffer dataBuffer = PinotDataBuffer
+ .mapFile(_dictionaryFile, false, 0, size, ByteOrder.BIG_ENDIAN, getClass().getSimpleName());
+ VarLengthBytesValueReaderWriter writer = new VarLengthBytesValueReaderWriter(dataBuffer, sortedByteArrays)) {
+
+ LOGGER.info("Using variable length bytes dictionary for column: {}, size: {}, numElements: {}",
+ _fieldSpec.getName(), size, writer.getNumElements());
+ }
+ } else {
+ // Backward-compatible: index file is always big-endian
+ try (PinotDataBuffer dataBuffer = PinotDataBuffer
+ .mapFile(_dictionaryFile, false, 0, (long) sortedByteArrays.length * _numBytesPerEntry, ByteOrder.BIG_ENDIAN,
+ getClass().getSimpleName());
+ FixedByteValueReaderWriter writer = new FixedByteValueReaderWriter(dataBuffer)) {
+ for (int i = 0; i < sortedByteArrays.length; i++) {
+ byte[] value = sortedByteArrays[i];
+ writer.writeUnpaddedString(i, _numBytesPerEntry, value);
+ }
+
+ LOGGER.info("Using fixed bytes value dictionary for column: {}, size: {}", _fieldSpec.getName(),
+ (long) sortedByteArrays.length * _numBytesPerEntry);
+ }
+ }
+ }
+
public int getNumBytesPerEntry() {
return _numBytesPerEntry;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index 645165d..248f16d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -25,10 +25,12 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.data.MetricFieldSpec;
@@ -482,8 +484,10 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive
}
ColumnStatistics columnProfile = segmentStats.getColumnProfileFor(column);
+ Set<String> varLengthDictionaryColumns = new HashSet<>(config.getVarLengthDictionaryColumns());
indexCreationInfoMap.put(column,
- new ColumnIndexCreationInfo(columnProfile, true/*createDictionary*/, ForwardIndexType.FIXED_BIT_COMPRESSED,
+ new ColumnIndexCreationInfo(columnProfile, true/*createDictionary*/,
+ varLengthDictionaryColumns.contains(column), ForwardIndexType.FIXED_BIT_COMPRESSED,
InvertedIndexType.ROARING_BITMAPS, false/*isAutoGenerated*/,
dataSchema.getFieldSpecFor(column).getDefaultNullValue()));
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
index 7560ec9..7dcda28 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
@@ -46,6 +46,7 @@ public class IndexLoadingConfig {
private Set<String> _invertedIndexColumns = new HashSet<>();
private Set<String> _noDictionaryColumns = new HashSet<>(); // TODO: replace this by _noDictionaryConfig.
private Map<String, String> _noDictionaryConfig = new HashMap<>();
+ private Set<String> _varLengthDictionaryColumns = new HashSet<>();
private Set<String> _onHeapDictionaryColumns = new HashSet<>();
private Set<String> _bloomFilterColumns = new HashSet<>();
@@ -95,6 +96,11 @@ public class IndexLoadingConfig {
_noDictionaryConfig.putAll(noDictionaryConfig);
}
+ List<String> varLengthDictionaryColumns = indexingConfig.getVarLengthDictionaryColumns();
+ if (varLengthDictionaryColumns != null) {
+ _varLengthDictionaryColumns.addAll(varLengthDictionaryColumns);
+ }
+
List<String> onHeapDictionaryColumns = indexingConfig.getOnHeapDictionaryColumns();
if (onHeapDictionaryColumns != null) {
_onHeapDictionaryColumns.addAll(onHeapDictionaryColumns);
@@ -192,6 +198,11 @@ public class IndexLoadingConfig {
}
@Nonnull
+ public Set<String> getVarLengthDictionaryColumns() {
+ return _varLengthDictionaryColumns;
+ }
+
+ @Nonnull
public Set<String> getOnHeapDictionaryColumns() {
return _onHeapDictionaryColumns;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index 9509982..232bd91 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -311,12 +311,14 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
totalDocs, maxNumberOfMultiValueElements);
ColumnIndexCreationInfo columnIndexCreationInfo =
- new ColumnIndexCreationInfo(columnStatistics, true/*createDictionary*/, ForwardIndexType.FIXED_BIT_COMPRESSED,
- InvertedIndexType.SORTED_INDEX, true/*isAutoGenerated*/, defaultValue/*defaultNullValue*/);
+ new ColumnIndexCreationInfo(columnStatistics, true/*createDictionary*/, false,
+ ForwardIndexType.FIXED_BIT_COMPRESSED, InvertedIndexType.SORTED_INDEX, true/*isAutoGenerated*/,
+ defaultValue/*defaultNullValue*/);
// Create dictionary.
// We will have only one value in the dictionary.
- try (SegmentDictionaryCreator creator = new SegmentDictionaryCreator(sortedArray, fieldSpec, _indexDir)) {
+ try (SegmentDictionaryCreator creator =
+ new SegmentDictionaryCreator(sortedArray, fieldSpec, _indexDir, false)) {
creator.build();
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ImmutableDictionaryReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ImmutableDictionaryReader.java
index 80541df..181de10 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ImmutableDictionaryReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/ImmutableDictionaryReader.java
@@ -25,6 +25,7 @@ import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.common.utils.primitive.ByteArray;
import org.apache.pinot.core.io.util.FixedByteValueReaderWriter;
import org.apache.pinot.core.io.util.ValueReader;
+import org.apache.pinot.core.io.util.VarLengthBytesValueReaderWriter;
import org.apache.pinot.core.segment.memory.PinotDataBuffer;
@@ -36,11 +37,18 @@ public abstract class ImmutableDictionaryReader extends BaseDictionary {
private final byte _paddingByte;
protected ImmutableDictionaryReader(PinotDataBuffer dataBuffer, int length, int numBytesPerValue, byte paddingByte) {
- Preconditions.checkState(dataBuffer.size() == length * numBytesPerValue);
- _valueReader = new FixedByteValueReaderWriter(dataBuffer);
+ if (VarLengthBytesValueReaderWriter.isVarLengthBytesDictBuffer(dataBuffer)) {
+ _valueReader = new VarLengthBytesValueReaderWriter(dataBuffer);
+ _numBytesPerValue = -1;
+ _paddingByte = 0;
+ }
+ else {
+ Preconditions.checkState(dataBuffer.size() == length * numBytesPerValue);
+ _valueReader = new FixedByteValueReaderWriter(dataBuffer);
+ _numBytesPerValue = numBytesPerValue;
+ _paddingByte = paddingByte;
+ }
_length = length;
- _numBytesPerValue = numBytesPerValue;
- _paddingByte = paddingByte;
}
protected ImmutableDictionaryReader(ValueReader valueReader, int length) {
@@ -244,6 +252,6 @@ public abstract class ImmutableDictionaryReader extends BaseDictionary {
}
protected byte[] getBuffer() {
- return new byte[_numBytesPerValue];
+ return _numBytesPerValue == -1 ? null : new byte[_numBytesPerValue];
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SegmentLocalFSDirectory.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SegmentLocalFSDirectory.java
index 51fcdb0..c56da38 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SegmentLocalFSDirectory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SegmentLocalFSDirectory.java
@@ -126,7 +126,7 @@ class SegmentLocalFSDirectory extends SegmentDirectory {
try {
return FileUtils.sizeOfDirectory(segmentDirectory.toPath().toFile());
} catch (IllegalArgumentException e) {
- LOGGER.error("Failed to read disk size for direcotry: ", segmentDirectory.getAbsolutePath());
+ LOGGER.error("Failed to read disk size for directory: ", segmentDirectory.getAbsolutePath());
return -1;
}
} else {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
index ceed347..5974cb2 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
@@ -52,6 +52,7 @@ public class MutableSegmentImplAggregateMetricsTest {
.build();
_mutableSegmentImpl = MutableSegmentImplTestUtils
.createMutableSegmentImpl(schema, new HashSet<>(Arrays.asList(DIMENSION_1, METRIC)),
+ new HashSet<>(Arrays.asList(DIMENSION_1)),
Collections.singleton(DIMENSION_1), true);
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
index af1a0ab..70ca0bb 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
@@ -76,7 +76,8 @@ public class MutableSegmentImplTest {
_schema = config.getSchema();
_mutableSegmentImpl = MutableSegmentImplTestUtils
- .createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(), false);
+ .createMutableSegmentImpl(_schema, Collections.emptySet(), Collections.emptySet(),
+ Collections.emptySet(),false);
_lastIngestionTimeMs = System.currentTimeMillis();
StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(_lastIngestionTimeMs);
_startTimeMs = System.currentTimeMillis();
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTestUtils.java b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTestUtils.java
index 96e7bda..b60bed5 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTestUtils.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTestUtils.java
@@ -39,7 +39,8 @@ public class MutableSegmentImplTestUtils {
private static final String STEAM_NAME = "testStream";
public static MutableSegmentImpl createMutableSegmentImpl(@Nonnull Schema schema,
- @Nonnull Set<String> noDictionaryColumns, @Nonnull Set<String> invertedIndexColumns, boolean aggregateMetrics) {
+ @Nonnull Set<String> noDictionaryColumns, @Nonnull Set<String> varLengthDictionaryColumns,
+ @Nonnull Set<String> invertedIndexColumns, boolean aggregateMetrics) {
RealtimeSegmentStatsHistory statsHistory = mock(RealtimeSegmentStatsHistory.class);
when(statsHistory.getEstimatedCardinality(anyString())).thenReturn(200);
when(statsHistory.getEstimatedAvgColSize(anyString())).thenReturn(32);
@@ -47,6 +48,7 @@ public class MutableSegmentImplTestUtils {
RealtimeSegmentConfig realtimeSegmentConfig =
new RealtimeSegmentConfig.Builder().setSegmentName(SEGMENT_NAME).setStreamName(STEAM_NAME).setSchema(schema)
.setCapacity(100000).setAvgNumMultiValues(2).setNoDictionaryColumns(noDictionaryColumns)
+ .setVarLengthDictionaryColumns(varLengthDictionaryColumns)
.setInvertedIndexColumns(invertedIndexColumns).setRealtimeSegmentZKMetadata(new RealtimeSegmentZKMetadata())
.setMemoryManager(new DirectMemoryManager(SEGMENT_NAME)).setStatsHistory(statsHistory)
.setAggregateMetrics(aggregateMetrics).build();
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriterTest.java b/pinot-core/src/test/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriterTest.java
new file mode 100644
index 0000000..069da08
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/io/util/VarLengthBytesValueReaderWriterTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.io.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test for {@link VarLengthBytesValueReaderWriter}
+ */
+public class VarLengthBytesValueReaderWriterTest {
+ private final Random random = new Random();
+
+ @Test
+ public void testEmptyDictionary()
+ throws IOException {
+ byte[][] byteArrays = new byte[][]{};
+ long size = VarLengthBytesValueReaderWriter.getRequiredSize(byteArrays);
+ Assert.assertEquals(size, 20);
+
+ final File tempFile =
+ new File(FileUtils.getTempDirectory(), VarLengthBytesValueReaderWriterTest.class.getName() + random.nextInt());
+
+ try (PinotDataBuffer buffer = PinotDataBuffer.mapFile(tempFile, false, 0, size, ByteOrder.BIG_ENDIAN, null)) {
+ VarLengthBytesValueReaderWriter readerWriter = new VarLengthBytesValueReaderWriter(buffer, byteArrays);
+ Assert.assertEquals(readerWriter.getNumElements(), 0);
+ }
+
+ try (PinotDataBuffer buffer = PinotDataBuffer.mapFile(tempFile, true, 0, size, ByteOrder.BIG_ENDIAN, null)) {
+ VarLengthBytesValueReaderWriter readerWriter = new VarLengthBytesValueReaderWriter(buffer);
+ Assert.assertEquals(readerWriter.getNumElements(), 0);
+ } finally {
+ FileUtils.forceDelete(tempFile);
+ }
+ }
+
+ @Test
+ public void testSingleByteArray()
+ throws IOException {
+ byte[] array = new byte[]{1, 2, 3, 4};
+ byte[][] byteArrays = new byte[][]{array};
+ long size = VarLengthBytesValueReaderWriter.getRequiredSize(byteArrays);
+ Assert.assertEquals(size, 28);
+
+ final File tempFile =
+ new File(FileUtils.getTempDirectory(), VarLengthBytesValueReaderWriterTest.class.getName() + random.nextInt());
+
+ try (PinotDataBuffer buffer = PinotDataBuffer.mapFile(tempFile, false, 0, size, ByteOrder.BIG_ENDIAN, null)) {
+ VarLengthBytesValueReaderWriter readerWriter = new VarLengthBytesValueReaderWriter(buffer, byteArrays);
+ Assert.assertEquals(readerWriter.getNumElements(), 1);
+ }
+
+ try (PinotDataBuffer buffer = PinotDataBuffer.mapFile(tempFile, true, 0, size, ByteOrder.BIG_ENDIAN, null)) {
+ VarLengthBytesValueReaderWriter readerWriter = new VarLengthBytesValueReaderWriter(buffer);
+ Assert.assertEquals(readerWriter.getNumElements(), 1);
+ byte[] newArray = readerWriter.getBytes(0, -1, null);
+ Assert.assertTrue(Arrays.equals(array, newArray));
+ } finally {
+ FileUtils.forceDelete(tempFile);
+ }
+ }
+
+ @Test
+ public void testArbitraryLengthByteArray()
+ throws IOException {
+ Random random = new Random();
+ int numByteArrays = random.nextInt(100);
+ byte[][] byteArrays = new byte[numByteArrays][];
+ for (int i = 0; i < numByteArrays; i++) {
+ byteArrays[i] = new byte[i + 1];
+ random.nextBytes(byteArrays[i]);
+ }
+ long size = VarLengthBytesValueReaderWriter.getRequiredSize(byteArrays);
+
+ final File tempFile =
+ new File(FileUtils.getTempDirectory(), VarLengthBytesValueReaderWriterTest.class.getName() + random.nextInt());
+
+ try (PinotDataBuffer buffer = PinotDataBuffer.mapFile(tempFile, false, 0, size, ByteOrder.BIG_ENDIAN, null)) {
+ VarLengthBytesValueReaderWriter readerWriter = new VarLengthBytesValueReaderWriter(buffer, byteArrays);
+ Assert.assertEquals(byteArrays.length, readerWriter.getNumElements());
+ }
+
+ try (PinotDataBuffer buffer = PinotDataBuffer.mapFile(tempFile, false, 0, size, ByteOrder.BIG_ENDIAN, null)) {
+ VarLengthBytesValueReaderWriter readerWriter = new VarLengthBytesValueReaderWriter(buffer);
+ Assert.assertEquals(byteArrays.length, readerWriter.getNumElements());
+ for (int i = 0; i < byteArrays.length; i++) {
+ byte[] array = byteArrays[i];
+ byte[] newArray = readerWriter.getBytes(i, -1, null);
+ Assert.assertTrue(Arrays.equals(array, newArray));
+ }
+ } finally {
+ FileUtils.forceDelete(tempFile);
+ }
+ }
+}
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDictionaryCreation.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDictionaryCreation.java
index dcc63ca..83fb6e2 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDictionaryCreation.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDictionaryCreation.java
@@ -129,6 +129,18 @@ public class BenchmarkDictionaryCreation {
}
}
+ @Benchmark
+ @BenchmarkMode(Mode.SampleTime)
+ @OutputTimeUnit(TimeUnit.MILLISECONDS)
+ public int benchmarkVarLengthStringDictionaryCreation()
+ throws IOException {
+ try (SegmentDictionaryCreator dictionaryCreator =
+ new SegmentDictionaryCreator(_sortedStrings, STRING_FIELD, INDEX_DIR, true)) {
+ dictionaryCreator.build();
+ return dictionaryCreator.indexOfSV("0");
+ }
+ }
+
@TearDown
public void tearDown()
throws Exception {
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/StringDictionaryPerfTest.java b/pinot-perf/src/main/java/org/apache/pinot/perf/StringDictionaryPerfTest.java
index c703a3e..10b3370 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/StringDictionaryPerfTest.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/StringDictionaryPerfTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.perf;
+import com.google.common.base.Joiner;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
@@ -27,6 +28,7 @@ import java.util.Random;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
import org.apache.pinot.common.data.DimensionFieldSpec;
import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.data.Schema;
@@ -45,12 +47,18 @@ import org.apache.pinot.core.segment.index.readers.Dictionary;
* Performance test for lookup in string dictionary.
*/
public class StringDictionaryPerfTest {
- private static final int MAX_STRING_LENGTH = 100;
+ private static final int MAX_STRING_LENGTH = 1000;
+ private static final boolean USE_FIXED_SIZE_STRING = true;
private static final String TMP_DIR = System.getProperty("java.io.tmpdir");
private static final String COLUMN_NAME = "test";
- private static final int TOTAL_NUM_LOOKUPS = 100_000;
-
- String[] _inputStrings;
+ private static final String[] STATS_HEADERS = new String[] {
+ "DictSize", "TimeTaken(ms)", "SegmentSize", "NumLookups", "Min", "Max", "Mean",
+ "StdDev", "Median", "Skewness", "Kurtosis", "Variance", "BufferSize"
+ };
+ private static final Joiner COMMA_JOINER = Joiner.on(",");
+
+ private final DescriptiveStatistics _statistics = new DescriptiveStatistics();
+ private String[] _inputStrings;
private File _indexDir;
private int _dictLength;
@@ -64,7 +72,7 @@ public class StringDictionaryPerfTest {
* @param dictLength Length of the dictionary
* @throws Exception
*/
- public void buildSegment(int dictLength)
+ private void buildSegment(int dictLength)
throws Exception {
Schema schema = new Schema();
String segmentName = "perfTestSegment" + System.currentTimeMillis();
@@ -89,14 +97,17 @@ public class StringDictionaryPerfTest {
int i = 0;
while (i < dictLength) {
HashMap<String, Object> map = new HashMap<>();
- String randomString = RandomStringUtils.randomAlphanumeric(1 + random.nextInt(MAX_STRING_LENGTH));
+ String randomString = RandomStringUtils.randomAlphanumeric(
+ USE_FIXED_SIZE_STRING ? MAX_STRING_LENGTH : (1 + random.nextInt(MAX_STRING_LENGTH)));
if (uniqueStrings.contains(randomString)) {
continue;
}
_inputStrings[i] = randomString;
- uniqueStrings.add(randomString);
+ if (uniqueStrings.add(randomString)) {
+ _statistics.addValue(randomString.length());
+ }
map.put("test", _inputStrings[i++]);
GenericRow genericRow = new GenericRow();
@@ -130,20 +141,61 @@ public class StringDictionaryPerfTest {
}
FileUtils.deleteQuietly(_indexDir);
- System.out.println("Total time for " + TOTAL_NUM_LOOKUPS + " lookups: " + (System.currentTimeMillis() - start));
+ System.out.println("Total time for " + numLookups + " lookups: "
+ + (System.currentTimeMillis() - start) + "ms");
}
- public static void main(String[] args)
- throws Exception {
- if (args.length != 2) {
- System.out.println("Usage: StringDictionaryPerfRunner <dictionary_length> <num_lookups> ");
+ /**
+ * Measures the performance of string dictionary lookups by dictId performing the provided
+ * number of reads from dictionary at random indices.
+ *
+ * @param numGetValues Number of values to read from the dictionary
+ * @throws Exception
+ */
+ private String[] perfTestGetValues(int numGetValues) throws Exception {
+ ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(_indexDir, ReadMode.heap);
+ Dictionary dictionary = immutableSegment.getDictionary(COLUMN_NAME);
+
+ Random random = new Random(System.nanoTime());
+ long start = System.currentTimeMillis();
+
+ for (int i = 0; i < numGetValues; i++) {
+ int index = random.nextInt(_dictLength);
+ dictionary.get(index);
+ }
+
+ long segmentSize = immutableSegment.getSegmentSizeBytes();
+ FileUtils.deleteQuietly(_indexDir);
+ long time = System.currentTimeMillis() - start;
+ System.out.println("Total time for " + numGetValues + " lookups: " + time + "ms");
+ return new String[] {String.valueOf(_statistics.getN()), String.valueOf(time),
+ String.valueOf(segmentSize), String.valueOf(numGetValues),
+ String.valueOf(_statistics.getMin()), String.valueOf(_statistics.getMax()),
+ String.valueOf(_statistics.getMean()), String.valueOf(_statistics.getStandardDeviation()),
+ String.valueOf(_statistics.getPercentile(50.0D)),
+ String.valueOf(_statistics.getSkewness()), String.valueOf(_statistics.getKurtosis()),
+ String.valueOf(_statistics.getVariance())
+ };
+ }
+
+ public static void main(String[] args) throws Exception {
+ if (args.length < 2) {
+ System.out.println("Usage: StringDictionaryPerfTest <dictionary_length> <dictionary_length> ... <num_lookups> ");
}
- int dictLength = Integer.valueOf(args[0]);
- int numLookups = Integer.valueOf(args[1]);
+ int numLookups = Integer.valueOf(args[args.length - 1]);
- StringDictionaryPerfTest test = new StringDictionaryPerfTest();
- test.buildSegment(dictLength);
- test.perfTestLookups(numLookups);
+ String[][] stats = new String[args.length][];
+ stats[0] = STATS_HEADERS;
+ for (int i = 0; i < args.length - 1; i++) {
+ int dictLength = Integer.valueOf(args[i]);
+ StringDictionaryPerfTest test = new StringDictionaryPerfTest();
+ test.buildSegment(dictLength);
+// test.perfTestLookups(numLookups);
+ stats[i + 1] = test.perfTestGetValues(numLookups);
+ }
+ for (String[] s: stats) {
+ System.out.println(COMMA_JOINER.join(s));
+ }
}
}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
index 52ba2b4..8baf3c6 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java
@@ -59,6 +59,7 @@ public class MemoryEstimator {
private long _sampleCompletedSegmentSizeBytes;
private Set<String> _invertedIndexColumns = new HashSet<>();
private Set<String> _noDictionaryColumns = new HashSet<>();
+ private Set<String> _varLengthDictionaryColumns = new HashSet<>();
int _avgMultiValues;
private File _tableDataDir;
@@ -83,6 +84,9 @@ public class MemoryEstimator {
if (CollectionUtils.isNotEmpty(_tableConfig.getIndexingConfig().getNoDictionaryColumns())) {
_noDictionaryColumns.addAll(_tableConfig.getIndexingConfig().getNoDictionaryColumns());
}
+ if (CollectionUtils.isNotEmpty(_tableConfig.getIndexingConfig().getVarLengthDictionaryColumns())) {
+ _varLengthDictionaryColumns.addAll(_tableConfig.getIndexingConfig().getVarLengthDictionaryColumns());
+ }
if (CollectionUtils.isNotEmpty(_tableConfig.getIndexingConfig().getInvertedIndexColumns())) {
_invertedIndexColumns.addAll(_tableConfig.getIndexingConfig().getInvertedIndexColumns());
}
@@ -123,8 +127,11 @@ public class MemoryEstimator {
new RealtimeSegmentConfig.Builder().setSegmentName(_segmentMetadata.getName())
.setStreamName(_tableNameWithType).setSchema(_segmentMetadata.getSchema())
.setCapacity(_segmentMetadata.getTotalDocs()).setAvgNumMultiValues(_avgMultiValues)
- .setNoDictionaryColumns(_noDictionaryColumns).setInvertedIndexColumns(_invertedIndexColumns)
- .setRealtimeSegmentZKMetadata(segmentZKMetadata).setOffHeap(true).setMemoryManager(memoryManager)
+ .setNoDictionaryColumns(_noDictionaryColumns)
+ .setVarLengthDictionaryColumns(_varLengthDictionaryColumns)
+ .setInvertedIndexColumns(_invertedIndexColumns)
+ .setRealtimeSegmentZKMetadata(segmentZKMetadata).setOffHeap(true)
+ .setMemoryManager(memoryManager)
.setStatsHistory(sampleStatsHistory);
// create mutable segment impl
@@ -220,8 +227,11 @@ public class MemoryEstimator {
RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
new RealtimeSegmentConfig.Builder().setSegmentName(_segmentMetadata.getName())
.setStreamName(_tableNameWithType).setSchema(_segmentMetadata.getSchema())
- .setCapacity(totalDocs).setAvgNumMultiValues(_avgMultiValues).setNoDictionaryColumns(_noDictionaryColumns)
- .setInvertedIndexColumns(_invertedIndexColumns).setRealtimeSegmentZKMetadata(segmentZKMetadata)
+ .setCapacity(totalDocs).setAvgNumMultiValues(_avgMultiValues)
+ .setNoDictionaryColumns(_noDictionaryColumns)
+ .setVarLengthDictionaryColumns(_varLengthDictionaryColumns)
+ .setInvertedIndexColumns(_invertedIndexColumns)
+ .setRealtimeSegmentZKMetadata(segmentZKMetadata)
.setOffHeap(true).setMemoryManager(memoryManager).setStatsHistory(statsHistory);
// create mutable segment impl
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org