You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ki...@apache.org on 2021/10/18 03:46:44 UTC
[pinot] branch mv-fwd-index updated: Wiring in the segment creation
driver Impl
This is an automated email from the ASF dual-hosted git repository.
kishoreg pushed a commit to branch mv-fwd-index
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/mv-fwd-index by this push:
new e8bca30 Wiring in the segment creation driver Impl
e8bca30 is described below
commit e8bca30473c28e9e9efe371bd06442955da7d433
Author: kishoreg <g....@gmail.com>
AuthorDate: Sun Oct 17 20:46:09 2021 -0700
Wiring in the segment creation driver Impl
---
.../apache/pinot/common/utils/PinotDataType.java | 47 ++-
.../pinot/core/minion/RawIndexConverter.java | 2 +-
.../tests/BaseClusterIntegrationTest.java | 3 +-
.../impl/VarByteChunkSVForwardIndexWriter.java | 43 ++-
.../creator/impl/SegmentColumnarIndexCreator.java | 422 ++++++++++++++++-----
.../fwd/MultiValueFixedByteRawIndexCreator.java | 22 +-
.../impl/fwd/MultiValueVarByteRawIndexCreator.java | 23 +-
.../stats/BytesColumnPredIndexStatsCollector.java | 56 ++-
.../local/segment/store/FilePerIndexDirectory.java | 7 +-
.../MultiValueVarByteRawIndexCreatorTest.java | 45 ++-
.../segment/index/creator/RawIndexCreatorTest.java | 147 +++++--
.../converter/DictionaryToRawIndexConverter.java | 2 +-
12 files changed, 627 insertions(+), 192 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
index 49406ba..a94fee9 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -29,7 +29,6 @@ import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.TimestampUtils;
-
/**
* The <code>PinotDataType</code> enum represents the data type of a value in a row from recordReader and provides
* utility methods to convert value across types if applicable.
@@ -583,7 +582,8 @@ public enum PinotDataType {
try {
return Base64.getDecoder().decode(value.toString());
} catch (Exception e) {
- throw new RuntimeException("Unable to convert JSON base64 encoded string value to BYTES. Input value: " + value,
+ throw new RuntimeException(
+ "Unable to convert JSON base64 encoded string value to BYTES. Input value: " + value,
e);
}
}
@@ -769,6 +769,12 @@ public enum PinotDataType {
return sourceType.toStringArray(value);
}
},
+ BYTES_ARRAY {
+ @Override
+ public byte[][] convert(Object value, PinotDataType sourceType) {
+ return sourceType.toBytesArray(value);
+ }
+ },
OBJECT_ARRAY;
@@ -817,7 +823,8 @@ public enum PinotDataType {
return JsonUtils.objectToString(value);
} catch (Exception e) {
throw new RuntimeException(
- "Unable to convert " + value.getClass().getCanonicalName() + " to JSON. Input value: " + value, e);
+ "Unable to convert " + value.getClass().getCanonicalName() + " to JSON. Input value: "
+ + value, e);
}
}
}
@@ -1020,6 +1027,24 @@ public enum PinotDataType {
}
}
+ public byte[][] toBytesArray(Object value) {
+ if (value instanceof byte[][]) {
+ return (byte[][]) value;
+ }
+ if (isSingleValue()) {
+ return new byte[][]{toBytes(value)};
+ } else {
+ Object[] valueArray = toObjectArray(value);
+ int length = valueArray.length;
+ byte[][] bytesArray = new byte[length][];
+ PinotDataType singleValueType = getSingleValueType();
+ for (int i = 0; i < length; i++) {
+ bytesArray[i] = singleValueType.toBytes(valueArray[i]);
+ }
+ return bytesArray;
+ }
+ }
+
private static Object[] toObjectArray(Object array) {
Class<?> componentType = array.getClass().getComponentType();
if (componentType.isPrimitive()) {
@@ -1042,7 +1067,8 @@ public enum PinotDataType {
}
public Object convert(Object value, PinotDataType sourceType) {
- throw new UnsupportedOperationException("Cannot convert value from " + sourceType + " to " + this);
+ throw new UnsupportedOperationException(
+ "Cannot convert value from " + sourceType + " to " + this);
}
/**
@@ -1082,6 +1108,8 @@ public enum PinotDataType {
return DOUBLE;
case STRING_ARRAY:
return STRING;
+ case BYTES_ARRAY:
+ return BYTES;
case OBJECT_ARRAY:
return OBJECT;
default:
@@ -1151,6 +1179,9 @@ public enum PinotDataType {
if (cls == Short.class) {
return SHORT_ARRAY;
}
+ if (cls == byte[].class) {
+ return BYTES_ARRAY;
+ }
return OBJECT_ARRAY;
}
@@ -1210,7 +1241,8 @@ public enum PinotDataType {
if (fieldSpec.isSingleValueField()) {
return BYTES;
} else {
- throw new IllegalStateException("There is no multi-value type for BYTES");
+ return BYTES_ARRAY;
+// throw new IllegalStateException("There is no multi-value type for BYTES");
}
default:
throw new UnsupportedOperationException(
@@ -1253,7 +1285,8 @@ public enum PinotDataType {
case STRING_ARRAY:
return STRING_ARRAY;
default:
- throw new IllegalStateException("Cannot convert ColumnDataType: " + columnDataType + " to PinotDataType");
+ throw new IllegalStateException(
+ "Cannot convert ColumnDataType: " + columnDataType + " to PinotDataType");
}
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
index f1c7fbb..a3b2e24 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
@@ -207,7 +207,7 @@ public class RawIndexConverter {
int numDocs = _originalSegmentMetadata.getTotalDocs();
int lengthOfLongestEntry = _originalSegmentMetadata.getColumnMetadataFor(columnName).getColumnMaxLength();
try (ForwardIndexCreator rawIndexCreator = SegmentColumnarIndexCreator
- .getRawIndexCreatorForColumn(_convertedIndexDir, ChunkCompressionType.SNAPPY, columnName, storedType, numDocs,
+ .getRawIndexCreatorForSVColumn(_convertedIndexDir, ChunkCompressionType.SNAPPY, columnName, storedType, numDocs,
lengthOfLongestEntry, false, BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
ForwardIndexReaderContext readerContext = reader.createContext()) {
switch (storedType) {
diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index ab5e028..ae36cc6 100644
--- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -81,11 +81,12 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
protected static final int DEFAULT_HLC_NUM_KAFKA_PARTITIONS = 10;
protected static final int DEFAULT_MAX_NUM_KAFKA_MESSAGES_PER_BATCH = 10000;
protected static final List<String> DEFAULT_NO_DICTIONARY_COLUMNS =
- Arrays.asList("ActualElapsedTime", "ArrDelay", "DepDelay", "CRSDepTime");
+ Arrays.asList("ActualElapsedTime", "ArrDelay", "DepDelay", "CRSDepTime", "RandomAirports");
protected static final String DEFAULT_SORTED_COLUMN = "Carrier";
protected static final List<String> DEFAULT_INVERTED_INDEX_COLUMNS = Arrays.asList("FlightNum", "Origin", "Quarter");
private static final List<String> DEFAULT_BLOOM_FILTER_COLUMNS = Arrays.asList("FlightNum", "Origin");
private static final List<String> DEFAULT_RANGE_INDEX_COLUMNS = Collections.singletonList("Origin");
+ private static final List<String> DEFAULT_TEXT_INDEX_COLUMNS = Collections.singletonList("RandomAirports");
protected static final int DEFAULT_NUM_REPLICAS = 1;
protected static final boolean DEFAULT_NULL_HANDLING_ENABLED = false;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
index 4444512..44ea807 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -25,34 +25,35 @@ import javax.annotation.concurrent.NotThreadSafe;
import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
-
/**
* Class to write out variable length bytes into a single column.
*
* The layout of the file is as follows:
* <p> Header Section: </p>
* <ul>
- * <li> Integer: File format version. </li>
- * <li> Integer: Total number of chunks. </li>
- * <li> Integer: Number of docs per chunk. </li>
- * <li> Integer: Length of longest entry (in bytes). </li>
- * <li> Integer: Total number of docs (version 2 onwards). </li>
- * <li> Integer: Compression type enum value (version 2 onwards). </li>
- * <li> Integer: Start offset of data header (version 2 onwards). </li>
- * <li> Integer array: Integer offsets for all chunks in the data (upto version 2),
- * Long array: Long offsets for all chunks in the data (version 3 onwards) </li>
+ * <li> Integer: File format version. </li>
+ * <li> Integer: Total number of chunks. </li>
+ * <li> Integer: Number of docs per chunk. </li>
+ * <li> Integer: Length of longest entry (in bytes). </li>
+ * <li> Integer: Total number of docs (version 2 onwards). </li>
+ * <li> Integer: Compression type enum value (version 2 onwards). </li>
+ * <li> Integer: Start offset of data header (version 2 onwards). </li>
+ * <li> Integer array: Integer offsets for all chunks in the data (upto version 2),
+ * Long array: Long offsets for all chunks in the data (version 3 onwards) </li>
* </ul>
*
* <p> Individual Chunks: </p>
* <ul>
- * <li> Integer offsets to start position of rows: For partial chunks, offset values are 0 for missing rows. </li>
- * <li> Data bytes. </li>
+ * <li> Integer offsets to start position of rows: For partial chunks, offset values are 0 for
+ * missing rows. </li>
+ * <li> Data bytes. </li>
* </ul>
*
* Only sequential writes are supported.
*/
@NotThreadSafe
public class VarByteChunkSVForwardIndexWriter extends BaseChunkSVForwardIndexWriter {
+
public static final int CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE = Integer.BYTES;
private final int _chunkHeaderSize;
@@ -68,9 +69,11 @@ public class VarByteChunkSVForwardIndexWriter extends BaseChunkSVForwardIndexWri
* @param numDocsPerChunk Number of documents per chunk.
* @param lengthOfLongestEntry Length of longest entry (in bytes)
* @param writerVersion writer format version
- * @throws FileNotFoundException Throws {@link FileNotFoundException} if the specified file is not found.
+ * @throws FileNotFoundException Throws {@link FileNotFoundException} if the specified file is
+ * not found.
*/
- public VarByteChunkSVForwardIndexWriter(File file, ChunkCompressionType compressionType, int totalDocs,
+ public VarByteChunkSVForwardIndexWriter(File file, ChunkCompressionType compressionType,
+ int totalDocs,
int numDocsPerChunk, int lengthOfLongestEntry, int writerVersion)
throws FileNotFoundException {
super(file, compressionType, totalDocs, numDocsPerChunk,
@@ -119,12 +122,12 @@ public class VarByteChunkSVForwardIndexWriter extends BaseChunkSVForwardIndexWri
/**
* Helper method to compress and write the current chunk.
* <ul>
- * <li> Chunk header is of fixed size, so fills out any remaining offsets for partially filled chunks. </li>
- * <li> Compresses and writes the chunk to the data file. </li>
- * <li> Updates the header with the current chunks offset. </li>
- * <li> Clears up the buffers, so that they can be reused. </li>
+ * <li> Chunk header is of fixed size, so fills out any remaining offsets for partially filled
+ * chunks. </li>
+ * <li> Compresses and writes the chunk to the data file. </li>
+ * <li> Updates the header with the current chunks offset. </li>
+ * <li> Clears up the buffers, so that they can be reused. </li>
* </ul>
- *
*/
protected void writeChunk() {
// For partially filled chunks, we still need to clear the offsets for remaining rows, as we reuse this buffer.
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
index 2829e8e..9d6b32b 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -23,6 +23,8 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import java.io.File;
import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -34,7 +36,9 @@ import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.pinot.common.utils.FileUtils;
import org.apache.pinot.segment.local.io.util.PinotDataBitSet;
import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator;
import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueFixedByteRawIndexCreator;
import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSortedForwardIndexCreator;
import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator;
@@ -82,12 +86,12 @@ import org.slf4j.LoggerFactory;
import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.*;
import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Segment.*;
-
/**
* Segment creator which writes data in a columnar form.
*/
// TODO: check resource leaks
public class SegmentColumnarIndexCreator implements SegmentCreator {
+
// TODO Refactor class name to match interface name
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentColumnarIndexCreator.class);
// Allow at most 512 characters for the metadata property
@@ -112,7 +116,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
private Map<String, Map<String, String>> _columnProperties;
@Override
- public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreationInfo segmentIndexCreationInfo,
+ public void init(SegmentGeneratorConfig segmentCreationSpec,
+ SegmentIndexCreationInfo segmentIndexCreationInfo,
Map<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema schema, File outDir)
throws Exception {
_docIdCounter = 0;
@@ -121,7 +126,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
_columnProperties = segmentCreationSpec.getColumnProperties();
// Check that the output directory does not exist
- Preconditions.checkState(!outDir.exists(), "Segment output directory: %s already exists", outDir);
+ Preconditions
+ .checkState(!outDir.exists(), "Segment output directory: %s already exists", outDir);
Preconditions.checkState(outDir.mkdirs(), "Failed to create output directory: %s", outDir);
_indexDir = outDir;
@@ -164,7 +170,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
Map<String, H3IndexConfig> h3IndexConfigs = _config.getH3IndexConfigs();
for (String columnName : h3IndexConfigs.keySet()) {
Preconditions
- .checkState(schema.hasColumn(columnName), "Cannot create H3 index for column: %s because it is not in schema",
+ .checkState(schema.hasColumn(columnName),
+ "Cannot create H3 index for column: %s because it is not in schema",
columnName);
}
@@ -178,15 +185,18 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
String columnName = fieldSpec.getName();
DataType storedType = fieldSpec.getDataType().getStoredType();
ColumnIndexCreationInfo indexCreationInfo = indexCreationInfoMap.get(columnName);
- Preconditions.checkNotNull(indexCreationInfo, "Missing index creation info for column: %s", columnName);
- boolean dictEnabledColumn = createDictionaryForColumn(indexCreationInfo, segmentCreationSpec, fieldSpec);
+ Preconditions.checkNotNull(indexCreationInfo, "Missing index creation info for column: %s",
+ columnName);
+ boolean dictEnabledColumn = createDictionaryForColumn(indexCreationInfo, segmentCreationSpec,
+ fieldSpec);
if (dictEnabledColumn) {
// Create dictionary-encoded index
// Initialize dictionary creator
SegmentDictionaryCreator dictionaryCreator =
- new SegmentDictionaryCreator(indexCreationInfo.getSortedUniqueElementsArray(), fieldSpec, _indexDir,
+ new SegmentDictionaryCreator(indexCreationInfo.getSortedUniqueElementsArray(),
+ fieldSpec, _indexDir,
indexCreationInfo.isUseVarLengthDictionary());
_dictionaryCreatorMap.put(columnName, dictionaryCreator);
@@ -194,8 +204,10 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
try {
dictionaryCreator.build();
} catch (Exception e) {
- LOGGER.error("Error building dictionary for field: {}, cardinality: {}, number of bytes per entry: {}",
- fieldSpec.getName(), indexCreationInfo.getDistinctValueCount(), dictionaryCreator.getNumBytesPerEntry());
+ LOGGER.error(
+ "Error building dictionary for field: {}, cardinality: {}, number of bytes per entry: {}",
+ fieldSpec.getName(), indexCreationInfo.getDistinctValueCount(),
+ dictionaryCreator.getNumBytesPerEntry());
throw e;
}
@@ -204,14 +216,17 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
if (fieldSpec.isSingleValueField()) {
if (indexCreationInfo.isSorted()) {
_forwardIndexCreatorMap
- .put(columnName, new SingleValueSortedForwardIndexCreator(_indexDir, columnName, cardinality));
+ .put(columnName,
+ new SingleValueSortedForwardIndexCreator(_indexDir, columnName, cardinality));
} else {
_forwardIndexCreatorMap.put(columnName,
- new SingleValueUnsortedForwardIndexCreator(_indexDir, columnName, cardinality, _totalDocs));
+ new SingleValueUnsortedForwardIndexCreator(_indexDir, columnName, cardinality,
+ _totalDocs));
}
} else {
_forwardIndexCreatorMap.put(columnName,
- new MultiValueUnsortedForwardIndexCreator(_indexDir, columnName, cardinality, _totalDocs,
+ new MultiValueUnsortedForwardIndexCreator(_indexDir, columnName, cardinality,
+ _totalDocs,
indexCreationInfo.getTotalNumberOfEntries()));
}
@@ -219,7 +234,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
if (invertedIndexColumns.contains(columnName) && !indexCreationInfo.isSorted()) {
if (segmentCreationSpec.isOnHeap()) {
_invertedIndexCreatorMap
- .put(columnName, new OnHeapBitmapInvertedIndexCreator(_indexDir, columnName, cardinality));
+ .put(columnName,
+ new OnHeapBitmapInvertedIndexCreator(_indexDir, columnName, cardinality));
} else {
_invertedIndexCreatorMap.put(columnName,
new OffHeapBitmapInvertedIndexCreator(_indexDir, fieldSpec, cardinality, _totalDocs,
@@ -230,20 +246,33 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
// Create raw index
// TODO: add support to multi-value column and inverted index
- Preconditions.checkState(fieldSpec.isSingleValueField(), "Cannot create raw index for multi-value column: %s",
- columnName);
+// Preconditions.checkState(fieldSpec.isSingleValueField(), "Cannot create raw index for multi-value column: %s",
+// columnName);
Preconditions.checkState(!invertedIndexColumns.contains(columnName),
"Cannot create inverted index for raw index column: %s", columnName);
- ChunkCompressionType compressionType = getColumnCompressionType(segmentCreationSpec, fieldSpec);
+ ChunkCompressionType compressionType = getColumnCompressionType(segmentCreationSpec,
+ fieldSpec);
// Initialize forward index creator
boolean deriveNumDocsPerChunk =
shouldDeriveNumDocsPerChunk(columnName, segmentCreationSpec.getColumnProperties());
- int writerVersion = rawIndexWriterVersion(columnName, segmentCreationSpec.getColumnProperties());
- _forwardIndexCreatorMap.put(columnName,
- getRawIndexCreatorForColumn(_indexDir, compressionType, columnName, storedType, _totalDocs,
- indexCreationInfo.getLengthOfLongestEntry(), deriveNumDocsPerChunk, writerVersion));
+ int writerVersion = rawIndexWriterVersion(columnName,
+ segmentCreationSpec.getColumnProperties());
+ if (fieldSpec.isSingleValueField()) {
+ _forwardIndexCreatorMap.put(columnName,
+ getRawIndexCreatorForSVColumn(_indexDir, compressionType, columnName, storedType,
+ _totalDocs,
+ indexCreationInfo.getLengthOfLongestEntry(), deriveNumDocsPerChunk,
+ writerVersion));
+ } else {
+ _forwardIndexCreatorMap.put(columnName,
+ getRawIndexCreatorForMVColumn(_indexDir, compressionType, columnName, storedType,
+ _totalDocs,
+ indexCreationInfo.getLengthOfLongestEntry(),
+ indexCreationInfo.getMaxNumberOfMultiValueElements(), deriveNumDocsPerChunk,
+ writerVersion));
+ }
}
if (textIndexColumns.contains(columnName)) {
@@ -251,18 +280,22 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
Preconditions.checkState(fieldSpec.isSingleValueField(),
"Text index is currently only supported on single-value columns");
Preconditions
- .checkState(storedType == DataType.STRING, "Text index is currently only supported on STRING type columns");
+ .checkState(storedType == DataType.STRING,
+ "Text index is currently only supported on STRING type columns");
_textIndexCreatorMap
- .put(columnName, new LuceneTextIndexCreator(columnName, _indexDir, true /* commitOnClose */));
+ .put(columnName,
+ new LuceneTextIndexCreator(columnName, _indexDir, true /* commitOnClose */));
}
if (fstIndexColumns.contains(columnName)) {
Preconditions.checkState(fieldSpec.isSingleValueField(),
"FST index is currently only supported on single-value columns");
Preconditions
- .checkState(storedType == DataType.STRING, "FST index is currently only supported on STRING type columns");
+ .checkState(storedType == DataType.STRING,
+ "FST index is currently only supported on STRING type columns");
Preconditions
- .checkState(dictEnabledColumn, "FST index is currently only supported on dictionary-encoded columns");
+ .checkState(dictEnabledColumn,
+ "FST index is currently only supported on dictionary-encoded columns");
_fstIndexCreatorMap.put(columnName, new LuceneFSTIndexCreator(_indexDir, columnName,
(String[]) indexCreationInfo.getSortedUniqueElementsArray()));
}
@@ -271,7 +304,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
Preconditions.checkState(fieldSpec.isSingleValueField(),
"Json index is currently only supported on single-value columns");
Preconditions
- .checkState(storedType == DataType.STRING, "Json index is currently only supported on STRING columns");
+ .checkState(storedType == DataType.STRING,
+ "Json index is currently only supported on STRING columns");
JsonIndexCreator jsonIndexCreator =
segmentCreationSpec.isOnHeap() ? new OnHeapJsonIndexCreator(_indexDir, columnName)
: new OffHeapJsonIndexCreator(_indexDir, columnName);
@@ -281,11 +315,14 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
H3IndexConfig h3IndexConfig = h3IndexConfigs.get(columnName);
if (h3IndexConfig != null) {
Preconditions
- .checkState(fieldSpec.isSingleValueField(), "H3 index is currently only supported on single-value columns");
- Preconditions.checkState(storedType == DataType.BYTES, "H3 index is currently only supported on BYTES columns");
+ .checkState(fieldSpec.isSingleValueField(),
+ "H3 index is currently only supported on single-value columns");
+ Preconditions.checkState(storedType == DataType.BYTES,
+ "H3 index is currently only supported on BYTES columns");
H3IndexResolution resolution = h3IndexConfig.getResolution();
GeoSpatialIndexCreator h3IndexCreator =
- segmentCreationSpec.isOnHeap() ? new OnHeapH3IndexCreator(_indexDir, columnName, resolution)
+ segmentCreationSpec.isOnHeap() ? new OnHeapH3IndexCreator(_indexDir, columnName,
+ resolution)
: new OffHeapH3IndexCreator(_indexDir, columnName, resolution);
_h3IndexCreatorMap.put(columnName, h3IndexCreator);
}
@@ -293,7 +330,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
_nullHandlingEnabled = _config.isNullHandlingEnabled();
if (_nullHandlingEnabled) {
// Initialize Null value vector map
- _nullValueVectorCreatorMap.put(columnName, new NullValueVectorCreator(_indexDir, columnName));
+ _nullValueVectorCreatorMap
+ .put(columnName, new NullValueVectorCreator(_indexDir, columnName));
}
}
}
@@ -308,7 +346,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
return false;
}
- public static int rawIndexWriterVersion(String columnName, Map<String, Map<String, String>> columnProperties) {
+ public static int rawIndexWriterVersion(String columnName,
+ Map<String, Map<String, String>> columnProperties) {
if (columnProperties != null && columnProperties.get(columnName) != null) {
Map<String, String> properties = columnProperties.get(columnName);
String version = properties.get(FieldConfig.RAW_INDEX_WRITER_VERSION);
@@ -321,20 +360,26 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
}
/**
- * Helper method that returns compression type to use based on segment creation spec and field type.
+ * Helper method that returns compression type to use based on segment creation spec and field
+ * type.
* <ul>
- * <li> Returns compression type from segment creation spec, if specified there.</li>
- * <li> Else, returns PASS_THROUGH for metrics, and SNAPPY for dimensions. This is because metrics are likely
- * to be spread in different chunks after applying predicates. Same could be true for dimensions, but in that
- * case, clients are expected to explicitly specify the appropriate compression type in the spec. </li>
+ * <li> Returns compression type from segment creation spec, if specified there.</li>
+ * <li> Else, returns PASS_THROUGH for metrics, and SNAPPY for dimensions. This is because metrics
+ * are likely
+ * to be spread in different chunks after applying predicates. Same could be true for dimensions,
+ * but in that
+ * case, clients are expected to explicitly specify the appropriate compression type in the spec.
+ * </li>
* </ul>
+ *
* @param segmentCreationSpec Segment creation spec
* @param fieldSpec Field spec for the column
* @return Compression type to use
*/
private ChunkCompressionType getColumnCompressionType(SegmentGeneratorConfig segmentCreationSpec,
FieldSpec fieldSpec) {
- ChunkCompressionType compressionType = segmentCreationSpec.getRawIndexCompressionType().get(fieldSpec.getName());
+ ChunkCompressionType compressionType = segmentCreationSpec.getRawIndexCompressionType()
+ .get(fieldSpec.getName());
if (compressionType == null) {
if (fieldSpec.getFieldType() == FieldType.METRIC) {
return ChunkCompressionType.PASS_THROUGH;
@@ -350,8 +395,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
* Returns true if dictionary should be created for a column, false otherwise.
* Currently there are two sources for this config:
* <ul>
- * <li> ColumnIndexCreationInfo (this is currently hard-coded to always return dictionary). </li>
- * <li> SegmentGeneratorConfig</li>
+ * <li> ColumnIndexCreationInfo (this is currently hard-coded to always return dictionary). </li>
+ * <li> SegmentGeneratorConfig</li>
* </ul>
*
* This method gives preference to the SegmentGeneratorConfig first.
@@ -361,15 +406,16 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
* @param spec Field spec for the column
* @return True if dictionary should be created for the column, false otherwise
*/
- private boolean createDictionaryForColumn(ColumnIndexCreationInfo info, SegmentGeneratorConfig config,
+ private boolean createDictionaryForColumn(ColumnIndexCreationInfo info,
+ SegmentGeneratorConfig config,
FieldSpec spec) {
String column = spec.getName();
if (config.getRawIndexCreationColumns().contains(column) || config.getRawIndexCompressionType()
.containsKey(column)) {
- if (!spec.isSingleValueField()) {
- throw new RuntimeException(
- "Creation of indices without dictionaries is supported for single valued columns only.");
- }
+// if (!spec.isSingleValueField()) {
+// throw new RuntimeException(
+// "Creation of indices without dictionaries is supported for single valued columns only.");
+// }
return false;
}
return info.isCreateDictionary();
@@ -387,16 +433,19 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
throw new RuntimeException("Null value for column:" + columnName);
}
- boolean isSingleValue = _schema.getFieldSpecFor(columnName).isSingleValueField();
+ FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
+
+ //get dictionaryCreator, will be null if column is not dictionaryEncoded
SegmentDictionaryCreator dictionaryCreator = _dictionaryCreatorMap.get(columnName);
- if (isSingleValue) {
- // SV column
- // text-index enabled SV column
- TextIndexCreator textIndexCreator = _textIndexCreatorMap.get(columnName);
- if (textIndexCreator != null) {
- textIndexCreator.add((String) columnValueToIndex);
- }
+ // text-index
+ TextIndexCreator textIndexCreator = _textIndexCreatorMap.get(columnName);
+ if (textIndexCreator != null) {
+ textIndexCreator.add((String) columnValueToIndex);
+ }
+
+ if (fieldSpec.isSingleValueField()) {
+ // Single Value column
JsonIndexCreator jsonIndexCreator = _jsonIndexCreatorMap.get(columnName);
if (jsonIndexCreator != null) {
jsonIndexCreator.add((String) columnValueToIndex);
@@ -411,7 +460,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
int dictId = dictionaryCreator.indexOfSV(columnValueToIndex);
// store the docID -> dictID mapping in forward index
forwardIndexCreator.putDictId(dictId);
- DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap.get(columnName);
+ DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap
+ .get(columnName);
if (invertedIndexCreator != null) {
// if inverted index enabled during segment creation,
// then store dictID -> docID mapping in inverted index
@@ -423,7 +473,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
if (textIndexCreator != null && !shouldStoreRawValueForTextIndex(columnName)) {
// for text index on raw columns, check the config to determine if actual raw value should
// be stored or not
- columnValueToIndex = _columnProperties.get(columnName).get(FieldConfig.TEXT_INDEX_RAW_VALUE);
+ columnValueToIndex = _columnProperties.get(columnName)
+ .get(FieldConfig.TEXT_INDEX_RAW_VALUE);
if (columnValueToIndex == null) {
columnValueToIndex = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE;
}
@@ -452,12 +503,120 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
}
}
} else {
- // MV column (always dictionary encoded)
- int[] dictIds = dictionaryCreator.indexOfMV(columnValueToIndex);
- forwardIndexCreator.putDictIdMV(dictIds);
- DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap.get(columnName);
- if (invertedIndexCreator != null) {
- invertedIndexCreator.add(dictIds, dictIds.length);
+ if (dictionaryCreator != null) {
+ //dictionary encoded
+ int[] dictIds = dictionaryCreator.indexOfMV(columnValueToIndex);
+ forwardIndexCreator.putDictIdMV(dictIds);
+ DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap
+ .get(columnName);
+ if (invertedIndexCreator != null) {
+ invertedIndexCreator.add(dictIds, dictIds.length);
+ }
+ } else {
+ // for text index on raw columns, check the config to determine if actual raw value should
+ // be stored or not
+ if (textIndexCreator != null && !shouldStoreRawValueForTextIndex(columnName)) {
+ Object value = _columnProperties.get(columnName)
+ .get(FieldConfig.TEXT_INDEX_RAW_VALUE);
+ if (value == null) {
+ value = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE;
+ }
+ if (forwardIndexCreator.getValueType().getStoredType() == DataType.STRING) {
+ value = String.valueOf(value);
+ int length = ((String[]) columnValueToIndex).length;
+ columnValueToIndex = new String[length];
+ Arrays.fill((String[]) columnValueToIndex, value);
+ } else if (forwardIndexCreator.getValueType().getStoredType() == DataType.BYTES) {
+ int length = ((byte[][]) columnValueToIndex).length;
+ columnValueToIndex = new byte[length][];
+ Arrays.fill((byte[][]) columnValueToIndex, String.valueOf(value).getBytes());
+ } else {
+ throw new RuntimeException(
+ "Text Index is only supported for STRING and BYTES stored type");
+ }
+ }
+ switch (forwardIndexCreator.getValueType()) {
+ case INT:
+ if (columnValueToIndex instanceof int[]) {
+ forwardIndexCreator.putIntMV((int[]) columnValueToIndex);
+ } else if (columnValueToIndex instanceof Object[]) {
+ int[] array = new int[((Object[]) columnValueToIndex).length];
+ for (int i = 0; i < array.length; i++) {
+ array[i] = (Integer) ((Object[]) columnValueToIndex)[i];
+ }
+ forwardIndexCreator.putIntMV(array);
+ } else {
+ //TODO: is this possible?
+ }
+ break;
+ case LONG:
+ if (columnValueToIndex instanceof long[]) {
+ forwardIndexCreator.putLongMV((long[]) columnValueToIndex);
+ } else if (columnValueToIndex instanceof Object[]) {
+ long[] array = new long[((Object[]) columnValueToIndex).length];
+ for (int i = 0; i < array.length; i++) {
+ array[i] = (Long) ((Object[]) columnValueToIndex)[i];
+ }
+ forwardIndexCreator.putLongMV(array);
+ } else {
+ //TODO: is this possible?
+ }
+ break;
+ case FLOAT:
+ if (columnValueToIndex instanceof float[]) {
+ forwardIndexCreator.putFloatMV((float[]) columnValueToIndex);
+ } else if (columnValueToIndex instanceof Object[]) {
+ float[] array = new float[((Object[]) columnValueToIndex).length];
+ for (int i = 0; i < array.length; i++) {
+ array[i] = (Float) ((Object[]) columnValueToIndex)[i];
+ }
+ forwardIndexCreator.putFloatMV(array);
+ } else {
+ //TODO: is this possible?
+ }
+ break;
+ case DOUBLE:
+ if (columnValueToIndex instanceof double[]) {
+ forwardIndexCreator.putDoubleMV((double[]) columnValueToIndex);
+ } else if (columnValueToIndex instanceof Object[]) {
+ double[] array = new double[((Object[]) columnValueToIndex).length];
+ for (int i = 0; i < array.length; i++) {
+ array[i] = (Double) ((Object[]) columnValueToIndex)[i];
+ }
+ forwardIndexCreator.putDoubleMV(array);
+ } else {
+ //TODO: is this possible?
+ }
+ break;
+ case STRING:
+ if (columnValueToIndex instanceof String[]) {
+ forwardIndexCreator.putStringMV((String[]) columnValueToIndex);
+ } else if (columnValueToIndex instanceof Object[]) {
+ String[] array = new String[((Object[]) columnValueToIndex).length];
+ for (int i = 0; i < array.length; i++) {
+ array[i] = (String) ((Object[]) columnValueToIndex)[i];
+ }
+ forwardIndexCreator.putStringMV(array);
+ } else {
+ //TODO: is this possible?
+ }
+ break;
+ case BYTES:
+ if (columnValueToIndex instanceof byte[][]) {
+ forwardIndexCreator.putBytesMV((byte[][]) columnValueToIndex);
+ } else if (columnValueToIndex instanceof Object[]) {
+ byte[][] array = new byte[((Object[]) columnValueToIndex).length][];
+ for (int i = 0; i < array.length; i++) {
+ array[i] = (byte[]) ((Object[]) columnValueToIndex)[i];
+ }
+ forwardIndexCreator.putBytesMV(array);
+ } else {
+ //TODO: is this possible?
+ }
+ break;
+ default:
+ throw new IllegalStateException();
+ }
}
}
@@ -468,6 +627,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
}
}
}
+
_docIdCounter++;
}
@@ -491,7 +651,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
@Override
public void seal()
throws ConfigurationException, IOException {
- for (DictionaryBasedInvertedIndexCreator invertedIndexCreator : _invertedIndexCreatorMap.values()) {
+ for (DictionaryBasedInvertedIndexCreator invertedIndexCreator : _invertedIndexCreatorMap
+ .values()) {
invertedIndexCreator.seal();
}
for (TextIndexCreator textIndexCreator : _textIndexCreatorMap.values()) {
@@ -515,10 +676,12 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
private void writeMetadata()
throws ConfigurationException {
PropertiesConfiguration properties =
- new PropertiesConfiguration(new File(_indexDir, V1Constants.MetadataKeys.METADATA_FILE_NAME));
+ new PropertiesConfiguration(
+ new File(_indexDir, V1Constants.MetadataKeys.METADATA_FILE_NAME));
properties.setProperty(SEGMENT_CREATOR_VERSION, _config.getCreatorVersion());
- properties.setProperty(SEGMENT_PADDING_CHARACTER, String.valueOf(V1Constants.Str.DEFAULT_STRING_PAD_CHAR));
+ properties.setProperty(SEGMENT_PADDING_CHARACTER,
+ String.valueOf(V1Constants.Str.DEFAULT_STRING_PAD_CHAR));
properties.setProperty(SEGMENT_NAME, _segmentName);
properties.setProperty(TABLE_NAME, _config.getTableName());
properties.setProperty(DIMENSIONS, _config.getDimensions());
@@ -530,7 +693,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
// Write time related metadata (start time, end time, time unit)
if (timeColumnName != null) {
- ColumnIndexCreationInfo timeColumnIndexCreationInfo = _indexCreationInfoMap.get(timeColumnName);
+ ColumnIndexCreationInfo timeColumnIndexCreationInfo = _indexCreationInfoMap
+ .get(timeColumnName);
if (timeColumnIndexCreationInfo != null) {
long startTime;
long endTime;
@@ -548,7 +712,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
if (_config.getTimeColumnType() == SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE) {
// For TimeColumnType.SIMPLE_DATE_FORMAT, convert time value into millis since epoch
- DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern(_config.getSimpleDateFormat());
+ DateTimeFormatter dateTimeFormatter = DateTimeFormat
+ .forPattern(_config.getSimpleDateFormat());
startTime = dateTimeFormatter.parseMillis(startTimeStr);
endTime = dateTimeFormatter.parseMillis(endTimeStr);
timeUnit = TimeUnit.MILLISECONDS;
@@ -575,10 +740,12 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
if (!_config.isSkipTimeValueCheck()) {
Interval timeInterval =
- new Interval(timeUnit.toMillis(startTime), timeUnit.toMillis(endTime), DateTimeZone.UTC);
+ new Interval(timeUnit.toMillis(startTime), timeUnit.toMillis(endTime),
+ DateTimeZone.UTC);
Preconditions.checkState(TimeUtils.isValidTimeInterval(timeInterval),
"Invalid segment start/end time: %s (in millis: %s/%s) for time column: %s, must be between: %s",
- timeInterval, timeInterval.getStartMillis(), timeInterval.getEndMillis(), timeColumnName,
+ timeInterval, timeInterval.getStartMillis(), timeInterval.getEndMillis(),
+ timeColumnName,
TimeUtils.VALID_TIME_INTERVAL);
}
@@ -596,7 +763,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
String column = entry.getKey();
ColumnIndexCreationInfo columnIndexCreationInfo = entry.getValue();
SegmentDictionaryCreator dictionaryCreator = _dictionaryCreatorMap.get(column);
- int dictionaryElementSize = (dictionaryCreator != null) ? dictionaryCreator.getNumBytesPerEntry() : 0;
+ int dictionaryElementSize =
+ (dictionaryCreator != null) ? dictionaryCreator.getNumBytesPerEntry() : 0;
// TODO: after fixing the server-side dependency on HAS_INVERTED_INDEX and deployed, set HAS_INVERTED_INDEX
// properly
@@ -620,17 +788,24 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
boolean hasJsonIndex = _jsonIndexCreatorMap.containsKey(column);
- addColumnMetadataInfo(properties, column, columnIndexCreationInfo, _totalDocs, _schema.getFieldSpecFor(column),
- _dictionaryCreatorMap.containsKey(column), dictionaryElementSize, hasInvertedIndex, textIndexType,
+ addColumnMetadataInfo(properties, column, columnIndexCreationInfo, _totalDocs,
+ _schema.getFieldSpecFor(column),
+ _dictionaryCreatorMap.containsKey(column), dictionaryElementSize, hasInvertedIndex,
+ textIndexType,
hasFSTIndex, hasJsonIndex);
}
properties.save();
+// StringWriter sw = new StringWriter();
+// properties.save(sw);
+// System.out.println(sw.toString());
}
public static void addColumnMetadataInfo(PropertiesConfiguration properties, String column,
- ColumnIndexCreationInfo columnIndexCreationInfo, int totalDocs, FieldSpec fieldSpec, boolean hasDictionary,
- int dictionaryElementSize, boolean hasInvertedIndex, TextIndexType textIndexType, boolean hasFSTIndex,
+ ColumnIndexCreationInfo columnIndexCreationInfo, int totalDocs, FieldSpec fieldSpec,
+ boolean hasDictionary,
+ int dictionaryElementSize, boolean hasInvertedIndex, TextIndexType textIndexType,
+ boolean hasFSTIndex,
boolean hasJsonIndex) {
int cardinality = columnIndexCreationInfo.getDistinctValueCount();
properties.setProperty(getKeyFor(column, CARDINALITY), String.valueOf(cardinality));
@@ -639,35 +814,44 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
properties.setProperty(getKeyFor(column, DATA_TYPE), String.valueOf(dataType));
properties.setProperty(getKeyFor(column, BITS_PER_ELEMENT),
String.valueOf(PinotDataBitSet.getNumBitsPerValue(cardinality - 1)));
- properties.setProperty(getKeyFor(column, DICTIONARY_ELEMENT_SIZE), String.valueOf(dictionaryElementSize));
- properties.setProperty(getKeyFor(column, COLUMN_TYPE), String.valueOf(fieldSpec.getFieldType()));
- properties.setProperty(getKeyFor(column, IS_SORTED), String.valueOf(columnIndexCreationInfo.isSorted()));
- properties.setProperty(getKeyFor(column, HAS_NULL_VALUE), String.valueOf(columnIndexCreationInfo.hasNulls()));
+ properties.setProperty(getKeyFor(column, DICTIONARY_ELEMENT_SIZE),
+ String.valueOf(dictionaryElementSize));
+ properties
+ .setProperty(getKeyFor(column, COLUMN_TYPE), String.valueOf(fieldSpec.getFieldType()));
+ properties.setProperty(getKeyFor(column, IS_SORTED),
+ String.valueOf(columnIndexCreationInfo.isSorted()));
+ properties.setProperty(getKeyFor(column, HAS_NULL_VALUE),
+ String.valueOf(columnIndexCreationInfo.hasNulls()));
properties.setProperty(getKeyFor(column, HAS_DICTIONARY), String.valueOf(hasDictionary));
properties.setProperty(getKeyFor(column, TEXT_INDEX_TYPE), textIndexType.name());
properties.setProperty(getKeyFor(column, HAS_INVERTED_INDEX), String.valueOf(hasInvertedIndex));
properties.setProperty(getKeyFor(column, HAS_FST_INDEX), String.valueOf(hasFSTIndex));
properties.setProperty(getKeyFor(column, HAS_JSON_INDEX), String.valueOf(hasJsonIndex));
- properties.setProperty(getKeyFor(column, IS_SINGLE_VALUED), String.valueOf(fieldSpec.isSingleValueField()));
+ properties.setProperty(getKeyFor(column, IS_SINGLE_VALUED),
+ String.valueOf(fieldSpec.isSingleValueField()));
properties.setProperty(getKeyFor(column, MAX_MULTI_VALUE_ELEMENTS),
String.valueOf(columnIndexCreationInfo.getMaxNumberOfMultiValueElements()));
properties.setProperty(getKeyFor(column, TOTAL_NUMBER_OF_ENTRIES),
String.valueOf(columnIndexCreationInfo.getTotalNumberOfEntries()));
properties
- .setProperty(getKeyFor(column, IS_AUTO_GENERATED), String.valueOf(columnIndexCreationInfo.isAutoGenerated()));
+ .setProperty(getKeyFor(column, IS_AUTO_GENERATED),
+ String.valueOf(columnIndexCreationInfo.isAutoGenerated()));
PartitionFunction partitionFunction = columnIndexCreationInfo.getPartitionFunction();
if (partitionFunction != null) {
properties.setProperty(getKeyFor(column, PARTITION_FUNCTION), partitionFunction.toString());
- properties.setProperty(getKeyFor(column, NUM_PARTITIONS), columnIndexCreationInfo.getNumPartitions());
- properties.setProperty(getKeyFor(column, PARTITION_VALUES), columnIndexCreationInfo.getPartitions());
+ properties.setProperty(getKeyFor(column, NUM_PARTITIONS),
+ columnIndexCreationInfo.getNumPartitions());
+ properties.setProperty(getKeyFor(column, PARTITION_VALUES),
+ columnIndexCreationInfo.getPartitions());
}
// datetime field
if (fieldSpec.getFieldType().equals(FieldType.DATE_TIME)) {
DateTimeFieldSpec dateTimeFieldSpec = (DateTimeFieldSpec) fieldSpec;
properties.setProperty(getKeyFor(column, DATETIME_FORMAT), dateTimeFieldSpec.getFormat());
- properties.setProperty(getKeyFor(column, DATETIME_GRANULARITY), dateTimeFieldSpec.getGranularity());
+ properties
+ .setProperty(getKeyFor(column, DATETIME_GRANULARITY), dateTimeFieldSpec.getGranularity());
}
// NOTE: Min/max could be null for real-time aggregate metrics.
@@ -685,7 +869,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
}
}
- public static void addColumnMinMaxValueInfo(PropertiesConfiguration properties, String column, String minValue,
+ public static void addColumnMinMaxValueInfo(PropertiesConfiguration properties, String column,
+ String minValue,
String maxValue) {
if (isValidPropertyValue(minValue)) {
properties.setProperty(getKeyFor(column, MIN_VALUE), minValue);
@@ -699,9 +884,9 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
* Helper method to check whether the given value is a valid property value.
* <p>Value is invalid iff:
* <ul>
- * <li>It contains more than 512 characters</li>
- * <li>It contains leading/trailing whitespace</li>
- * <li>It contains list separator (',')</li>
+ * <li>It contains more than 512 characters</li>
+ * <li>It contains leading/trailing whitespace</li>
+ * <li>It contains list separator (',')</li>
* </ul>
*/
@VisibleForTesting
@@ -713,7 +898,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
if (length > METADATA_PROPERTY_LENGTH_LIMIT) {
return false;
}
- if (Character.isWhitespace(value.charAt(0)) || Character.isWhitespace(value.charAt(length - 1))) {
+ if (Character.isWhitespace(value.charAt(0)) || Character
+ .isWhitespace(value.charAt(length - 1))) {
return false;
}
return value.indexOf(',') == -1;
@@ -731,13 +917,15 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
* @param column Column name
* @param totalDocs Total number of documents to index
* @param lengthOfLongestEntry Length of longest entry
- * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive the number of rows per chunk
+ * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive the number of rows
+ * per chunk
* @param writerVersion version to use for the raw index writer
* @return raw index creator
- * @throws IOException
*/
- public static ForwardIndexCreator getRawIndexCreatorForColumn(File file, ChunkCompressionType compressionType,
- String column, DataType dataType, int totalDocs, int lengthOfLongestEntry, boolean deriveNumDocsPerChunk,
+ public static ForwardIndexCreator getRawIndexCreatorForSVColumn(File file,
+ ChunkCompressionType compressionType,
+ String column, DataType dataType, int totalDocs, int lengthOfLongestEntry,
+ boolean deriveNumDocsPerChunk,
int writerVersion)
throws IOException {
switch (dataType.getStoredType()) {
@@ -745,14 +933,56 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
case LONG:
case FLOAT:
case DOUBLE:
- return new SingleValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs, dataType,
+ return new SingleValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs,
+ dataType,
writerVersion);
case STRING:
case BYTES:
- return new SingleValueVarByteRawIndexCreator(file, compressionType, column, totalDocs, dataType,
+ return new SingleValueVarByteRawIndexCreator(file, compressionType, column, totalDocs,
+ dataType,
lengthOfLongestEntry, deriveNumDocsPerChunk, writerVersion);
default:
- throw new UnsupportedOperationException("Data type not supported for raw indexing: " + dataType);
+ throw new UnsupportedOperationException(
+ "Data type not supported for raw indexing: " + dataType);
+ }
+ }
+
+ /**
+ * Helper method to build the raw index creator for the column.
+ * Assumes that column to be indexed is single valued.
+ *
+ * @param file Output index file
+ * @param column Column name
+ * @param totalDocs Total number of documents to index
+ * @param lengthOfLongestEntry Length of longest entry
+ * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive the number of rows
+ * per chunk
+ * @param writerVersion version to use for the raw index writer
+ * @return raw index creator
+ */
+ public static ForwardIndexCreator getRawIndexCreatorForMVColumn(File file,
+ ChunkCompressionType compressionType,
+ String column, DataType dataType, final int totalDocs,
+ int lengthOfLongestEntry,
+ final int maxNumberOfMultiValueElements, boolean deriveNumDocsPerChunk,
+ int writerVersion)
+ throws IOException {
+ switch (dataType.getStoredType()) {
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ return new MultiValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs,
+ dataType, dataType.getStoredType().size(), maxNumberOfMultiValueElements,
+ deriveNumDocsPerChunk, writerVersion);
+ case STRING:
+ case BYTES:
+ return new MultiValueVarByteRawIndexCreator(file, compressionType, column, totalDocs,
+ dataType, lengthOfLongestEntry, maxNumberOfMultiValueElements, deriveNumDocsPerChunk,
+ writerVersion);
+ default:
+ throw new UnsupportedOperationException(
+ "Data type not supported for raw indexing: " + dataType);
}
}
@@ -760,8 +990,10 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
public void close()
throws IOException {
FileUtils.close(Iterables
- .concat(_dictionaryCreatorMap.values(), _forwardIndexCreatorMap.values(), _invertedIndexCreatorMap.values(),
- _textIndexCreatorMap.values(), _fstIndexCreatorMap.values(), _jsonIndexCreatorMap.values(),
+ .concat(_dictionaryCreatorMap.values(), _forwardIndexCreatorMap.values(),
+ _invertedIndexCreatorMap.values(),
+ _textIndexCreatorMap.values(), _fstIndexCreatorMap.values(),
+ _jsonIndexCreatorMap.values(),
_h3IndexCreatorMap.values(), _nullValueVectorCreatorMap.values()));
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
index d608a65..de11500 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
@@ -26,6 +26,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.V1Constants.Indexes;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -51,13 +52,14 @@ public class MultiValueFixedByteRawIndexCreator implements ForwardIndexCreator {
* @param column Name of column to index
* @param totalDocs Total number of documents to index
* @param valueType Type of the values
- * @param maxLength length of longest entry (in bytes)
*/
public MultiValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType,
String column,
- int totalDocs, DataType valueType, int maxLength)
+ int totalDocs, DataType valueType, final int maxLengthOfEachEntry,
+ final int maxNumberOfMultiValueElements)
throws IOException {
- this(baseIndexDir, compressionType, column, totalDocs, valueType, maxLength, false,
+ this(baseIndexDir, compressionType, column, totalDocs, valueType, maxLengthOfEachEntry,
+ maxNumberOfMultiValueElements, false,
BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
}
@@ -69,23 +71,23 @@ public class MultiValueFixedByteRawIndexCreator implements ForwardIndexCreator {
* @param column Name of column to index
* @param totalDocs Total number of documents to index
* @param valueType Type of the values
- * @param maxLength length of longest entry (in bytes)
+ * @param maxLengthOfEachEntry length of longest entry (in bytes)
* @param deriveNumDocsPerChunk true if writer should auto-derive the number of rows per chunk
* @param writerVersion writer format version
*/
public MultiValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType,
- String column,
- int totalDocs, DataType valueType, int maxLength, boolean deriveNumDocsPerChunk,
+ String column, int totalDocs, DataType valueType, final int maxLengthOfEachEntry,
+ final int maxNumberOfMultiValueElements, boolean deriveNumDocsPerChunk,
int writerVersion)
throws IOException {
File file = new File(baseIndexDir,
- column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
+ column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
FileUtils.deleteQuietly(file);
+ int totalMaxLength = maxNumberOfMultiValueElements * maxLengthOfEachEntry;
int numDocsPerChunk =
- deriveNumDocsPerChunk ? getNumDocsPerChunk(maxLength) : DEFAULT_NUM_DOCS_PER_CHUNK;
+ deriveNumDocsPerChunk ? getNumDocsPerChunk(totalMaxLength) : DEFAULT_NUM_DOCS_PER_CHUNK;
_indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs,
- numDocsPerChunk, maxLength,
- writerVersion);
+ numDocsPerChunk, totalMaxLength, writerVersion);
_valueType = valueType;
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
index 465b5f7..9264bde 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
@@ -65,9 +65,8 @@ public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator {
String column,
int totalDocs, DataType valueType, int maxTotalContentLength, int maxElements)
throws IOException {
- this(baseIndexDir, compressionType, column, totalDocs, valueType, false, maxTotalContentLength,
- maxElements,
- BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+ this(baseIndexDir, compressionType, column, totalDocs, valueType, maxTotalContentLength,
+ maxElements, false, BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
}
/**
@@ -78,25 +77,24 @@ public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator {
* @param column Name of column to index
* @param totalDocs Total number of documents to index
* @param valueType Type of the values
- * @param deriveNumDocsPerChunk true if writer should auto-derive the number of rows per chunk
- * @param maxTotalContentLength max total content length
+ * @param maxLength max length for each entry
* @param maxElements max number of elements
+ * @param deriveNumDocsPerChunk true if writer should auto-derive the number of rows per
+ * chunk
* @param writerVersion writer format version
*/
public MultiValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType,
- String column,
- int totalDocs, DataType valueType, boolean deriveNumDocsPerChunk, int maxTotalContentLength,
- int maxElements,
- int writerVersion)
+ String column, int totalDocs, DataType valueType,
+ int maxLength, int maxElements, boolean deriveNumDocsPerChunk, int writerVersion)
throws IOException {
//we will prepend the actual content with numElements and length array containing length of each element
- int maxLength = Integer.BYTES + maxElements * Integer.BYTES + maxTotalContentLength;
+ int totalMaxLength = Integer.BYTES + maxElements * Integer.BYTES + maxLength * maxElements;
File file = new File(baseIndexDir,
column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
int numDocsPerChunk =
- deriveNumDocsPerChunk ? getNumDocsPerChunk(maxLength) : DEFAULT_NUM_DOCS_PER_CHUNK;
+ deriveNumDocsPerChunk ? getNumDocsPerChunk(totalMaxLength) : DEFAULT_NUM_DOCS_PER_CHUNK;
_indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs,
- numDocsPerChunk, maxLength,
+ numDocsPerChunk, totalMaxLength,
writerVersion);
_valueType = valueType;
}
@@ -151,6 +149,7 @@ public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator {
@Override
public void putBytesMV(final byte[][] values) {
+
int totalBytes = 0;
for (int i = 0; i < values.length; i++) {
int length = values[i].length;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java
index a0cfd66..908be20 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -21,14 +21,15 @@ package org.apache.pinot.segment.local.segment.creator.impl.stats;
import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
import java.util.Arrays;
import java.util.Set;
+import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
import org.apache.pinot.spi.utils.ByteArray;
-
/**
* Extension of {@link AbstractColumnStatisticsCollector} for byte[] column type.
*/
public class BytesColumnPredIndexStatsCollector extends AbstractColumnStatisticsCollector {
+
private final Set<ByteArray> _values = new ObjectOpenHashSet<>(INITIAL_HASH_SET_SIZE);
private int _minLength = Integer.MAX_VALUE;
@@ -36,22 +37,36 @@ public class BytesColumnPredIndexStatsCollector extends AbstractColumnStatistics
private ByteArray[] _sortedValues;
private boolean _sealed = false;
- public BytesColumnPredIndexStatsCollector(String column, StatsCollectorConfig statsCollectorConfig) {
+ public BytesColumnPredIndexStatsCollector(String column,
+ StatsCollectorConfig statsCollectorConfig) {
super(column, statsCollectorConfig);
}
@Override
public void collect(Object entry) {
- ByteArray value = new ByteArray((byte[]) entry);
- addressSorted(value);
- updatePartition(value);
- _values.add(value);
-
- int length = value.length();
- _minLength = Math.min(_minLength, length);
- _maxLength = Math.max(_maxLength, length);
-
- _totalNumberOfEntries++;
+ if (entry instanceof Object[]) {
+ Object[] values = (Object[]) entry;
+ for (Object obj : values) {
+ ByteArray value = new ByteArray((byte[]) obj);
+ _values.add(value);
+ int length = value.length();
+ _minLength = Math.min(_minLength, length);
+ _maxLength = Math.max(_maxLength, length);
+ }
+ _maxNumberOfMultiValues = Math.max(_maxNumberOfMultiValues, values.length);
+ updateTotalNumberOfEntries(values);
+ } else {
+ ByteArray value = new ByteArray((byte[]) entry);
+ addressSorted(value);
+ updatePartition(value);
+ _values.add(value);
+
+ int length = value.length();
+ _minLength = Math.min(_minLength, length);
+ _maxLength = Math.max(_maxLength, length);
+
+ _totalNumberOfEntries++;
+ }
}
@Override
@@ -59,7 +74,8 @@ public class BytesColumnPredIndexStatsCollector extends AbstractColumnStatistics
if (_sealed) {
return _sortedValues[0];
}
- throw new IllegalStateException("you must seal the collector first before asking for min value");
+ throw new IllegalStateException(
+ "you must seal the collector first before asking for min value");
}
@Override
@@ -67,7 +83,8 @@ public class BytesColumnPredIndexStatsCollector extends AbstractColumnStatistics
if (_sealed) {
return _sortedValues[_sortedValues.length - 1];
}
- throw new IllegalStateException("you must seal the collector first before asking for max value");
+ throw new IllegalStateException(
+ "you must seal the collector first before asking for max value");
}
@Override
@@ -75,7 +92,8 @@ public class BytesColumnPredIndexStatsCollector extends AbstractColumnStatistics
if (_sealed) {
return _sortedValues;
}
- throw new IllegalStateException("you must seal the collector first before asking for unique values set");
+ throw new IllegalStateException(
+ "you must seal the collector first before asking for unique values set");
}
@Override
@@ -88,7 +106,8 @@ public class BytesColumnPredIndexStatsCollector extends AbstractColumnStatistics
if (_sealed) {
return _maxLength;
}
- throw new IllegalStateException("you must seal the collector first before asking for longest value");
+ throw new IllegalStateException(
+ "you must seal the collector first before asking for longest value");
}
@Override
@@ -96,7 +115,8 @@ public class BytesColumnPredIndexStatsCollector extends AbstractColumnStatistics
if (_sealed) {
return _sortedValues.length;
}
- throw new IllegalStateException("you must seal the collector first before asking for cardinality");
+ throw new IllegalStateException(
+ "you must seal the collector first before asking for cardinality");
}
@Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java
index aba55ca..71e62cb 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java
@@ -30,6 +30,7 @@ import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.V1Constants.Indexes;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.segment.spi.store.ColumnIndexDirectory;
@@ -167,7 +168,11 @@ class FilePerIndexDirectory extends ColumnIndexDirectory {
fileExtension = V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION;
}
} else {
- fileExtension = V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION;
+ if (!columnMetadata.hasDictionary()) {
+ fileExtension = V1Constants.Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION;
+ } else {
+ fileExtension = V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION;
+ }
}
break;
case INVERTED_INDEX:
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
index 373c3a9..e98a185 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
@@ -39,7 +39,7 @@ public class MultiValueVarByteRawIndexCreatorTest {
}
@Test
- public void testMV() throws IOException {
+ public void testMVString() throws IOException {
String column = "testCol";
int numDocs = 1000;
int maxElements = 50;
@@ -78,4 +78,47 @@ public class MultiValueVarByteRawIndexCreatorTest {
Assert.assertEquals(inputs.get(i), readValue);
}
}
+
+ @Test
+ public void testMVBytes() throws IOException {
+ String column = "testCol";
+ int numDocs = 1000;
+ int maxElements = 50;
+ int maxTotalLength = 500;
+ File file = new File(OUTPUT_DIR, column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
+ file.delete();
+ MultiValueVarByteRawIndexCreator creator = new MultiValueVarByteRawIndexCreator(
+ new File(OUTPUT_DIR), ChunkCompressionType.SNAPPY, column, numDocs, DataType.BYTES,
+ maxTotalLength, maxElements);
+ List<byte[][]> inputs = new ArrayList<>();
+ Random random = new Random();
+ for (int i = 0; i < numDocs; i++) {
+ //int length = 1;
+ int length = random.nextInt(10);
+ byte[][] values = new byte[length][];
+ for (int j = 0; j < length; j++) {
+ char[] value = new char[length];
+ Arrays.fill(value, 'a');
+ values[j] = new String(value).getBytes();
+ }
+ inputs.add(values);
+ creator.putBytesMV(values);
+ }
+ creator.close();
+
+ //read
+ final PinotDataBuffer buffer = PinotDataBuffer
+ .mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, "");
+ VarByteChunkMVForwardIndexReader reader = new VarByteChunkMVForwardIndexReader(buffer,
+ DataType.BYTES);
+ final ChunkReaderContext context = reader.createContext();
+ byte[][] values = new byte[maxElements][];
+ for (int i = 0; i < numDocs; i++) {
+ int length = reader.getBytesMV(i, values, context);
+ byte[][] readValue = Arrays.copyOf(values, length);
+ for (int j = 0; j < length; j++) {
+ Assert.assertTrue(Arrays.equals(inputs.get(i)[j], readValue[j]));
+ }
+ }
+ }
}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
index 9f515e8..b9f0f15 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.segment.index.creator;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -32,6 +33,7 @@ import org.apache.pinot.segment.local.loader.LocalSegmentDirectoryLoader;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
+import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
@@ -55,15 +57,16 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-
/**
* Class for testing Raw index creators.
*/
public class RawIndexCreatorTest {
+
private static final int NUM_ROWS = 10009;
private static final int MAX_STRING_LENGTH = 101;
- private static final String SEGMENT_DIR_NAME = System.getProperty("java.io.tmpdir") + File.separator + "fwdIndexTest";
+ private static final String SEGMENT_DIR_NAME =
+ System.getProperty("java.io.tmpdir") + File.separator + "fwdIndexTest";
private static final String SEGMENT_NAME = "testSegment";
private static final String INT_COLUMN = "intColumn";
@@ -71,6 +74,12 @@ public class RawIndexCreatorTest {
private static final String FLOAT_COLUMN = "floatColumn";
private static final String DOUBLE_COLUMN = "doubleColumn";
private static final String STRING_COLUMN = "stringColumn";
+ private static final String INT_MV_COLUMN = "intMVColumn";
+ private static final String LONG_MV_COLUMN = "longMVColumn";
+ private static final String FLOAT_MV_COLUMN = "floatMVColumn";
+ private static final String DOUBLE_MV_COLUMN = "doubleMVColumn";
+ private static final String STRING_MV_COLUMN = "stringMVColumn";
+ private static final String BYTES_MV_COLUMN = "bytesMVColumn";
Random _random;
private RecordReader _recordReader;
@@ -79,8 +88,6 @@ public class RawIndexCreatorTest {
/**
* Setup to build a segment with raw indexes (no-dictionary) of various data types.
- *
- * @throws Exception
*/
@BeforeClass
public void setup()
@@ -91,8 +98,15 @@ public class RawIndexCreatorTest {
schema.addField(new DimensionFieldSpec(FLOAT_COLUMN, DataType.FLOAT, true));
schema.addField(new DimensionFieldSpec(DOUBLE_COLUMN, DataType.DOUBLE, true));
schema.addField(new DimensionFieldSpec(STRING_COLUMN, DataType.STRING, true));
+ schema.addField(new DimensionFieldSpec(INT_MV_COLUMN, DataType.INT, false));
+ schema.addField(new DimensionFieldSpec(LONG_MV_COLUMN, DataType.LONG, false));
+ schema.addField(new DimensionFieldSpec(FLOAT_MV_COLUMN, DataType.FLOAT, false));
+ schema.addField(new DimensionFieldSpec(DOUBLE_MV_COLUMN, DataType.DOUBLE, false));
+ schema.addField(new DimensionFieldSpec(STRING_MV_COLUMN, DataType.STRING, false));
+ schema.addField(new DimensionFieldSpec(BYTES_MV_COLUMN, DataType.BYTES, false));
- TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("test").build();
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("test")
+ .build();
_random = new Random(System.nanoTime());
_recordReader = buildIndex(tableConfig, schema);
@@ -109,7 +123,6 @@ public class RawIndexCreatorTest {
/**
* Test for int raw index creator.
* Compares values read from the raw index against expected value.
- * @throws Exception
*/
@Test
public void testIntRawIndexCreator()
@@ -120,7 +133,6 @@ public class RawIndexCreatorTest {
/**
* Test for long raw index creator.
* Compares values read from the raw index against expected value.
- * @throws Exception
*/
@Test
public void testLongRawIndexCreator()
@@ -131,7 +143,6 @@ public class RawIndexCreatorTest {
/**
* Test for float raw index creator.
* Compares values read from the raw index against expected value.
- * @throws Exception
*/
@Test
public void testFloatRawIndexCreator()
@@ -142,7 +153,6 @@ public class RawIndexCreatorTest {
/**
* Test for double raw index creator.
* Compares values read from the raw index against expected value.
- * @throws Exception
*/
@Test
public void testDoubleRawIndexCreator()
@@ -153,19 +163,21 @@ public class RawIndexCreatorTest {
/**
* Test for string raw index creator.
* Compares values read from the raw index against expected value.
- * @throws Exception
*/
@Test
public void testStringRawIndexCreator()
throws Exception {
PinotDataBuffer indexBuffer = getIndexBufferForColumn(STRING_COLUMN);
- try (VarByteChunkSVForwardIndexReader rawIndexReader = new VarByteChunkSVForwardIndexReader(indexBuffer,
+ try (VarByteChunkSVForwardIndexReader rawIndexReader = new VarByteChunkSVForwardIndexReader(
+ indexBuffer,
DataType.STRING);
- BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = rawIndexReader.createContext()) {
+ BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = rawIndexReader
+ .createContext()) {
_recordReader.rewind();
for (int row = 0; row < NUM_ROWS; row++) {
GenericRow expectedRow = _recordReader.next();
- Assert.assertEquals(rawIndexReader.getString(row, readerContext), expectedRow.getValue(STRING_COLUMN));
+ Assert.assertEquals(rawIndexReader.getString(row, readerContext),
+ expectedRow.getValue(STRING_COLUMN));
}
}
}
@@ -175,17 +187,88 @@ public class RawIndexCreatorTest {
*
* @param column Column for which to perform the test
* @param dataType Data type of the column
- * @throws Exception
*/
private void testFixedLengthRawIndexCreator(String column, DataType dataType)
throws Exception {
PinotDataBuffer indexBuffer = getIndexBufferForColumn(column);
- try (FixedByteChunkSVForwardIndexReader rawIndexReader = new FixedByteChunkSVForwardIndexReader(indexBuffer,
- dataType); BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = rawIndexReader.createContext()) {
+ try (FixedByteChunkSVForwardIndexReader rawIndexReader = new FixedByteChunkSVForwardIndexReader(
+ indexBuffer,
+ dataType); BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = rawIndexReader
+ .createContext()) {
+ _recordReader.rewind();
+ for (int row = 0; row < NUM_ROWS; row++) {
+ GenericRow expectedRow = _recordReader.next();
+ Assert.assertEquals(readValueFromIndex(rawIndexReader, readerContext, row),
+ expectedRow.getValue(column));
+ }
+ }
+ }
+
+ /**
+ * Test for multi value string raw index creator.
+ * Compares values read from the raw index against expected value.
+ */
+ @Test
+ public void testStringMVRawIndexCreator()
+ throws Exception {
+ PinotDataBuffer indexBuffer = getIndexBufferForColumn(STRING_MV_COLUMN);
+ try (VarByteChunkMVForwardIndexReader rawIndexReader = new VarByteChunkMVForwardIndexReader(
+ indexBuffer,
+ DataType.STRING);
+ BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = rawIndexReader
+ .createContext()) {
_recordReader.rewind();
+ int maxNumberOfMultiValues = _segmentDirectory.getSegmentMetadata()
+ .getColumnMetadataFor(STRING_MV_COLUMN).getMaxNumberOfMultiValues();
+ final String[] valueBuffer = new String[maxNumberOfMultiValues];
for (int row = 0; row < NUM_ROWS; row++) {
GenericRow expectedRow = _recordReader.next();
- Assert.assertEquals(readValueFromIndex(rawIndexReader, readerContext, row), expectedRow.getValue(column));
+
+ int length = rawIndexReader.getStringMV(row, valueBuffer, readerContext);
+ String[] readValue = Arrays.copyOf(valueBuffer, length);
+ Object[] writtenValue = (Object[]) expectedRow.getValue(STRING_MV_COLUMN);
+ if (writtenValue == null || writtenValue.length == 0) {
+ writtenValue = new String[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_STRING};
+ }
+ for (int i = 0; i < length; i++) {
+ Object expected = writtenValue[i];
+ Object actual = readValue[i];
+ Assert.assertEquals(expected, actual);
+ }
+ }
+ }
+ }
+
+ /**
+ * Test for multi value string raw index creator.
+ * Compares values read from the raw index against expected value.
+ */
+ @Test
+ public void testBytesMVRawIndexCreator()
+ throws Exception {
+ PinotDataBuffer indexBuffer = getIndexBufferForColumn(BYTES_MV_COLUMN);
+ try (VarByteChunkMVForwardIndexReader rawIndexReader = new VarByteChunkMVForwardIndexReader(
+ indexBuffer, DataType.BYTES);
+ BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = rawIndexReader
+ .createContext()) {
+ _recordReader.rewind();
+ int maxNumberOfMultiValues = _segmentDirectory.getSegmentMetadata()
+ .getColumnMetadataFor(BYTES_MV_COLUMN).getMaxNumberOfMultiValues();
+ final byte[][] valueBuffer = new byte[maxNumberOfMultiValues][];
+ for (int row = 0; row < NUM_ROWS; row++) {
+ GenericRow expectedRow = _recordReader.next();
+
+ int length = rawIndexReader.getBytesMV(row, valueBuffer, readerContext);
+ byte[][] readValue = Arrays.copyOf(valueBuffer, length);
+ Object[] writtenValue = (Object[]) expectedRow.getValue(BYTES_MV_COLUMN);
+ if (writtenValue == null || writtenValue.length == 0) {
+ writtenValue = new byte[][]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_BYTES};
+ }
+ for (int i = 0; i < length; i++) {
+ Object expected = writtenValue[i];
+ Object actual = readValue[i];
+ Assert.assertTrue(Arrays.equals((byte[]) expected, (byte[]) actual));
+ }
}
}
}
@@ -202,10 +285,10 @@ public class RawIndexCreatorTest {
}
/**
- * Helper method to build a segment containing a single valued string column with RAW (no-dictionary) index.
+ * Helper method to build a segment containing a single valued string column with RAW
+ * (no-dictionary) index.
*
* @return Array of string values for the rows in the generated index.
- * @throws Exception
*/
private RecordReader buildIndex(TableConfig tableConfig, Schema schema)
throws Exception {
@@ -221,9 +304,17 @@ public class RawIndexCreatorTest {
for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
Object value;
-
- value = getRandomValue(_random, fieldSpec.getDataType());
- map.put(fieldSpec.getName(), value);
+ if (fieldSpec.isSingleValueField()) {
+ value = getRandomValue(_random, fieldSpec.getDataType());
+ map.put(fieldSpec.getName(), value);
+ } else {
+ int length = _random.nextInt(50);
+ Object[] values = new Object[length];
+ for (int j = 0; j < length; j++) {
+ values[j] = getRandomValue(_random, fieldSpec.getDataType());
+ }
+ map.put(fieldSpec.getName(), values);
+ }
}
GenericRow genericRow = new GenericRow();
@@ -262,9 +353,15 @@ public class RawIndexCreatorTest {
return random.nextDouble();
case STRING:
return StringUtil
- .sanitizeStringValue(RandomStringUtils.random(random.nextInt(MAX_STRING_LENGTH)), Integer.MAX_VALUE);
+ .sanitizeStringValue(RandomStringUtils.random(random.nextInt(MAX_STRING_LENGTH)),
+ Integer.MAX_VALUE);
+ case BYTES:
+ return StringUtil
+ .sanitizeStringValue(RandomStringUtils.random(random.nextInt(MAX_STRING_LENGTH)),
+ Integer.MAX_VALUE).getBytes();
default:
- throw new UnsupportedOperationException("Unsupported data type for random value generator: " + dataType);
+ throw new UnsupportedOperationException(
+ "Unsupported data type for random value generator: " + dataType);
}
}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
index d3e53141..279ae6d 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
@@ -308,7 +308,7 @@ public class DictionaryToRawIndexConverter {
int lengthOfLongestEntry = (storedType == DataType.STRING) ? getLengthOfLongestEntry(dictionary) : -1;
try (ForwardIndexCreator rawIndexCreator = SegmentColumnarIndexCreator
- .getRawIndexCreatorForColumn(newSegment, compressionType, column, storedType, numDocs, lengthOfLongestEntry,
+ .getRawIndexCreatorForSVColumn(newSegment, compressionType, column, storedType, numDocs, lengthOfLongestEntry,
false, BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
ForwardIndexReaderContext readerContext = reader.createContext()) {
switch (storedType) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org