You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ki...@apache.org on 2021/10/17 07:19:57 UTC
[pinot] 01/01: Initial code for MultiValue forward Index
This is an automated email from the ASF dual-hosted git repository.
kishoreg pushed a commit to branch mv-fwd-index
in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 10b8d0ac8c8a4e911c5355335ed1e182d0c38543
Author: kishoreg <g....@gmail.com>
AuthorDate: Sun Oct 17 00:19:04 2021 -0700
Initial code for MultiValue forward Index
---
.../fwd/MultiValueFixedByteRawIndexCreator.java | 179 +++++++++++++++++
.../impl/fwd/MultiValueVarByteRawIndexCreator.java | 214 +++++++++++++++++++++
.../stats/AbstractColumnStatisticsCollector.java | 5 +
.../forward/VarByteChunkMVForwardIndexReader.java | 197 +++++++++++++++++++
.../MultiValueVarByteRawIndexCreatorTest.java | 81 ++++++++
.../org/apache/pinot/segment/spi/V1Constants.java | 1 +
.../spi/index/creator/ForwardIndexCreator.java | 9 +
.../spi/index/reader/ForwardIndexReader.java | 19 ++
8 files changed, 705 insertions(+)
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
new file mode 100644
index 0000000..d608a65
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.fwd;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+/**
+ * Forward index creator for raw (non-dictionary-encoded) single-value column of variable length
+ * data type (STRING,
+ * BYTES).
+ */
+public class MultiValueFixedByteRawIndexCreator implements ForwardIndexCreator {
+
+ private static final int DEFAULT_NUM_DOCS_PER_CHUNK = 1000;
+ private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024;
+
+ private final VarByteChunkSVForwardIndexWriter _indexWriter;
+ private final DataType _valueType;
+
+ /**
+ * Create a var-byte raw index creator for the given column
+ *
+ * @param baseIndexDir Index directory
+ * @param compressionType Type of compression to use
+ * @param column Name of column to index
+ * @param totalDocs Total number of documents to index
+ * @param valueType Type of the values
+ * @param maxLength length of longest entry (in bytes)
+ */
+ public MultiValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType,
+ String column,
+ int totalDocs, DataType valueType, int maxLength)
+ throws IOException {
+ this(baseIndexDir, compressionType, column, totalDocs, valueType, maxLength, false,
+ BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+ }
+
+ /**
+ * Create a var-byte raw index creator for the given column
+ *
+ * @param baseIndexDir Index directory
+ * @param compressionType Type of compression to use
+ * @param column Name of column to index
+ * @param totalDocs Total number of documents to index
+ * @param valueType Type of the values
+ * @param maxLength length of longest entry (in bytes)
+ * @param deriveNumDocsPerChunk true if writer should auto-derive the number of rows per chunk
+ * @param writerVersion writer format version
+ */
+ public MultiValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType,
+ String column,
+ int totalDocs, DataType valueType, int maxLength, boolean deriveNumDocsPerChunk,
+ int writerVersion)
+ throws IOException {
+ File file = new File(baseIndexDir,
+ column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
+ FileUtils.deleteQuietly(file);
+ int numDocsPerChunk =
+ deriveNumDocsPerChunk ? getNumDocsPerChunk(maxLength) : DEFAULT_NUM_DOCS_PER_CHUNK;
+ _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs,
+ numDocsPerChunk, maxLength,
+ writerVersion);
+ _valueType = valueType;
+ }
+
+ @VisibleForTesting
+ public static int getNumDocsPerChunk(int lengthOfLongestEntry) {
+ int overheadPerEntry =
+ lengthOfLongestEntry + VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+ return Math.max(TARGET_MAX_CHUNK_SIZE / overheadPerEntry, 1);
+ }
+
+ @Override
+ public boolean isDictionaryEncoded() {
+ return false;
+ }
+
+ @Override
+ public boolean isSingleValue() {
+ return false;
+ }
+
+ @Override
+ public DataType getValueType() {
+ return _valueType;
+ }
+
+ @Override
+ public void putIntMV(final int[] values) {
+
+ byte[] bytes = new byte[Integer.BYTES
+ + values.length * Integer.BYTES];//numValues, bytes required to store the content
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ //write the length
+ byteBuffer.putInt(values.length);
+ //write the content of each element
+ for (final int value : values) {
+ byteBuffer.putInt(value);
+ }
+ _indexWriter.putBytes(bytes);
+ }
+
+ @Override
+ public void putLongMV(final long[] values) {
+
+ byte[] bytes = new byte[Integer.BYTES
+ + values.length * Long.BYTES];//numValues, bytes required to store the content
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ //write the length
+ byteBuffer.putInt(values.length);
+ //write the content of each element
+ for (final long value : values) {
+ byteBuffer.putLong(value);
+ }
+ _indexWriter.putBytes(bytes);
+ }
+
+ @Override
+ public void putFloatMV(final float[] values) {
+
+ byte[] bytes = new byte[Integer.BYTES
+ + values.length * Float.BYTES];//numValues, bytes required to store the content
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ //write the length
+ byteBuffer.putInt(values.length);
+ //write the content of each element
+ for (final float value : values) {
+ byteBuffer.putFloat(value);
+ }
+ _indexWriter.putBytes(bytes);
+ }
+
+ @Override
+ public void putDoubleMV(final double[] values) {
+
+ byte[] bytes = new byte[Integer.BYTES
+ + values.length * Long.BYTES];//numValues, bytes required to store the content
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ //write the length
+ byteBuffer.putInt(values.length);
+ //write the content of each element
+ for (final double value : values) {
+ byteBuffer.putDouble(value);
+ }
+ _indexWriter.putBytes(bytes);
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ _indexWriter.close();
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
new file mode 100644
index 0000000..465b5f7
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.fwd;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Random;
+import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader.ChunkReaderContext;
+import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
+import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.V1Constants.Indexes;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+/**
+ * Forward index creator for raw (non-dictionary-encoded) single-value column of variable length
+ * data type (STRING,
+ * BYTES).
+ */
+public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator {
+
+ private static final int DEFAULT_NUM_DOCS_PER_CHUNK = 1000;
+ private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024;
+
+ private final VarByteChunkSVForwardIndexWriter _indexWriter;
+ private final DataType _valueType;
+
+ /**
+ * Create a var-byte raw index creator for the given column
+ *
+ * @param baseIndexDir Index directory
+ * @param compressionType Type of compression to use
+ * @param column Name of column to index
+ * @param totalDocs Total number of documents to index
+ * @param valueType Type of the values
+ * @param maxTotalContentLength max total content length
+ * @param maxElements max number of elements
+ */
+ public MultiValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType,
+ String column,
+ int totalDocs, DataType valueType, int maxTotalContentLength, int maxElements)
+ throws IOException {
+ this(baseIndexDir, compressionType, column, totalDocs, valueType, false, maxTotalContentLength,
+ maxElements,
+ BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+ }
+
+ /**
+ * Create a var-byte raw index creator for the given column
+ *
+ * @param baseIndexDir Index directory
+ * @param compressionType Type of compression to use
+ * @param column Name of column to index
+ * @param totalDocs Total number of documents to index
+ * @param valueType Type of the values
+ * @param deriveNumDocsPerChunk true if writer should auto-derive the number of rows per chunk
+ * @param maxTotalContentLength max total content length
+ * @param maxElements max number of elements
+ * @param writerVersion writer format version
+ */
+ public MultiValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType,
+ String column,
+ int totalDocs, DataType valueType, boolean deriveNumDocsPerChunk, int maxTotalContentLength,
+ int maxElements,
+ int writerVersion)
+ throws IOException {
+ //we will prepend the actual content with numElements and length array containing length of each element
+ int maxLength = Integer.BYTES + maxElements * Integer.BYTES + maxTotalContentLength;
+ File file = new File(baseIndexDir,
+ column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
+ int numDocsPerChunk =
+ deriveNumDocsPerChunk ? getNumDocsPerChunk(maxLength) : DEFAULT_NUM_DOCS_PER_CHUNK;
+ _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs,
+ numDocsPerChunk, maxLength,
+ writerVersion);
+ _valueType = valueType;
+ }
+
+ @VisibleForTesting
+ public static int getNumDocsPerChunk(int lengthOfLongestEntry) {
+ int overheadPerEntry =
+ lengthOfLongestEntry + VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+ return Math.max(TARGET_MAX_CHUNK_SIZE / overheadPerEntry, 1);
+ }
+
+ @Override
+ public boolean isDictionaryEncoded() {
+ return false;
+ }
+
+ @Override
+ public boolean isSingleValue() {
+ return false;
+ }
+
+ @Override
+ public DataType getValueType() {
+ return _valueType;
+ }
+
+ @Override
+ public void putStringMV(final String[] values) {
+ int totalBytes = 0;
+ for (int i = 0; i < values.length; i++) {
+ final String value = values[i];
+ int length = value.getBytes().length;
+ totalBytes += length;
+ }
+ byte[] bytes = new byte[Integer.BYTES + Integer.BYTES * values.length
+ + totalBytes];//numValues, length array, concatenated bytes
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ //write the length
+ byteBuffer.putInt(values.length);
+ //write the length of each element
+ for (final String value : values) {
+ byteBuffer.putInt(value.getBytes().length);
+ }
+ //write the content of each element
+ //todo:maybe there is a smart way to avoid 3 loops but at the cost of allocating more memory upfront and resize as needed
+ for (final String value : values) {
+ byteBuffer.put(value.getBytes());
+ }
+// System.out.println("Inserting bytes of length:" + bytes.length);
+ _indexWriter.putBytes(bytes);
+ }
+
+ @Override
+ public void putBytesMV(final byte[][] values) {
+ int totalBytes = 0;
+ for (int i = 0; i < values.length; i++) {
+ int length = values[i].length;
+ totalBytes += length;
+ }
+ byte[] bytes = new byte[Integer.BYTES + Integer.BYTES * values.length
+ + totalBytes];//numValues, length array, concatenated bytes
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ //write the length
+ byteBuffer.putInt(values.length);
+ //write the length of each element
+ for (final byte[] value : values) {
+ byteBuffer.putInt(value.length);
+ }
+ //write the content of each element
+ //todo:maybe there is a smart way to avoid 3 loops but at the cost of allocating more memory upfront and resize as needed
+ for (final byte[] value : values) {
+ byteBuffer.put(value);
+ }
+// System.out.println("Inserting bytes of length:" + bytes.length);
+ _indexWriter.putBytes(bytes);
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ _indexWriter.close();
+ }
+
+ private static void testSV() throws IOException {
+ final File dir = new File(System.getProperty("java.io.tmpdir"));
+
+ String column = "testCol";
+ int numDocs = 10000;
+ int maxLength = 100;
+ File file = new File(dir, column + Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
+ file.delete();
+ SingleValueVarByteRawIndexCreator creator = new SingleValueVarByteRawIndexCreator(dir,
+ ChunkCompressionType.SNAPPY, column, numDocs, DataType.STRING, maxLength, true,
+ BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+ Random random = new Random();
+ for (int i = 0; i < numDocs; i++) {
+ int length = random.nextInt(maxLength);
+ char[] value = new char[length];
+ Arrays.fill(value, 'a');
+ creator.putString(new String(value));
+ }
+ creator.close();
+
+ //read
+ final PinotDataBuffer buffer = PinotDataBuffer
+ .mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, "");
+ VarByteChunkSVForwardIndexReader reader = new VarByteChunkSVForwardIndexReader(buffer,
+ DataType.STRING);
+ final ChunkReaderContext context = reader.createContext();
+ for (int i = 0; i < numDocs; i++) {
+ String value = reader.getString(i, context);
+ System.out.println("value = " + value);
+ }
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
index 284bf69..6407b55 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
@@ -47,6 +47,7 @@ public abstract class AbstractColumnStatisticsCollector implements ColumnStatist
protected int _totalNumberOfEntries = 0;
protected int _maxNumberOfMultiValues = 0;
+ protected int _maxLengthOfMultiValues = 0;
private PartitionFunction _partitionFunction;
private final int _numPartitions;
private final Set<Integer> _partitions;
@@ -72,6 +73,10 @@ public abstract class AbstractColumnStatisticsCollector implements ColumnStatist
return _maxNumberOfMultiValues;
}
+ public int getMaxLengthOfMultiValues() {
+ return _maxLengthOfMultiValues;
+ }
+
void addressSorted(Object entry) {
if (_isSorted) {
if (_previousValue != null) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java
new file mode 100644
index 0000000..eef396c
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.readers.forward;
+
+import java.nio.ByteBuffer;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+/**
+ * Chunk-based single-value raw (non-dictionary-encoded) forward index reader for values of
+ * variable
+ * length data type
+ * (STRING, BYTES).
+ * <p>For data layout, please refer to the documentation for {@link VarByteChunkSVForwardIndexWriter}
+ */
+public final class VarByteChunkMVForwardIndexReader extends BaseChunkSVForwardIndexReader {
+
+ private static final int ROW_OFFSET_SIZE = VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+
+ private final int _maxChunkSize;
+
+ // Thread local (reusable) byte[] to read bytes from data file.
+ private final ThreadLocal<byte[]> _reusableBytes = ThreadLocal
+ .withInitial(() -> new byte[_lengthOfLongestEntry]);
+
+ public VarByteChunkMVForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType) {
+ super(dataBuffer, valueType);
+ _maxChunkSize = _numDocsPerChunk * (ROW_OFFSET_SIZE + _lengthOfLongestEntry);
+ }
+
+ @Nullable
+ @Override
+ public ChunkReaderContext createContext() {
+ if (_isCompressed) {
+ return new ChunkReaderContext(_maxChunkSize);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public int getStringMV(final int docId, final String[] valueBuffer,
+ final ChunkReaderContext context) {
+ byte[] compressedBytes;
+ if (_isCompressed) {
+ compressedBytes = getBytesCompressed(docId, context);
+ } else {
+ compressedBytes = getBytesUncompressed(docId);
+ }
+ ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes);
+ int numValues = byteBuffer.getInt();
+ int contentOffset = (numValues + 1) * Integer.BYTES;
+ for (int i = 0; i < numValues; i++) {
+ int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
+ byte[] bytes = new byte[length];
+ byteBuffer.position(contentOffset);
+ byteBuffer.get(bytes, 0, length);
+ valueBuffer[i] = new String(bytes);
+ contentOffset += length;
+ }
+ return numValues;
+ }
+
+ @Override
+ public int getBytesMV(final int docId, final byte[][] valueBuffer,
+ final ChunkReaderContext context) {
+ byte[] compressedBytes;
+ if (_isCompressed) {
+ compressedBytes = getBytesCompressed(docId, context);
+ } else {
+ compressedBytes = getBytesUncompressed(docId);
+ }
+ ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes);
+ int numValues = byteBuffer.getInt();
+ int contentOffset = (numValues + 1) * Integer.BYTES;
+ for (int i = 0; i < numValues; i++) {
+ int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
+ byte[] bytes = new byte[length];
+ byteBuffer.position(contentOffset);
+ byteBuffer.get(bytes, 0, length);
+ valueBuffer[i] = bytes;
+ contentOffset += length;
+ }
+ return numValues;
+ }
+
+ @Override
+ public byte[] getBytes(int docId, ChunkReaderContext context) {
+ if (_isCompressed) {
+ return getBytesCompressed(docId, context);
+ } else {
+ return getBytesUncompressed(docId);
+ }
+ }
+
+ /**
+ * Helper method to read BYTES value from the compressed index.
+ */
+ private byte[] getBytesCompressed(int docId, ChunkReaderContext context) {
+ int chunkRowId = docId % _numDocsPerChunk;
+ ByteBuffer chunkBuffer = getChunkBuffer(docId, context);
+
+ // These offsets are offset in the chunk buffer
+ int valueStartOffset = chunkBuffer.getInt(chunkRowId * ROW_OFFSET_SIZE);
+ int valueEndOffset = getValueEndOffset(chunkRowId, chunkBuffer);
+
+ byte[] bytes = new byte[valueEndOffset - valueStartOffset];
+ chunkBuffer.position(valueStartOffset);
+ chunkBuffer.get(bytes);
+ return bytes;
+ }
+
+ /**
+ * Helper method to read BYTES value from the uncompressed index.
+ */
+ private byte[] getBytesUncompressed(int docId) {
+ int chunkId = docId / _numDocsPerChunk;
+ int chunkRowId = docId % _numDocsPerChunk;
+
+ // These offsets are offset in the data buffer
+ long chunkStartOffset = getChunkPosition(chunkId);
+ long valueStartOffset =
+ chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + chunkRowId * ROW_OFFSET_SIZE);
+ long valueEndOffset = getValueEndOffset(chunkId, chunkRowId, chunkStartOffset);
+
+ byte[] bytes = new byte[(int) (valueEndOffset - valueStartOffset)];
+ _dataBuffer.copyTo(valueStartOffset, bytes);
+ return bytes;
+ }
+
+ /**
+ * Helper method to compute the end offset of the value in the chunk buffer.
+ */
+ private int getValueEndOffset(int rowId, ByteBuffer chunkBuffer) {
+ if (rowId == _numDocsPerChunk - 1) {
+ // Last row in the chunk
+ return chunkBuffer.limit();
+ } else {
+ int valueEndOffset = chunkBuffer.getInt((rowId + 1) * ROW_OFFSET_SIZE);
+ if (valueEndOffset == 0) {
+ // Last row in the last chunk (chunk is incomplete, which stores 0 as the offset for the absent rows)
+ return chunkBuffer.limit();
+ } else {
+ return valueEndOffset;
+ }
+ }
+ }
+
+ /**
+ * Helper method to compute the end offset of the value in the data buffer.
+ */
+ private long getValueEndOffset(int chunkId, int chunkRowId, long chunkStartOffset) {
+ if (chunkId == _numChunks - 1) {
+ // Last chunk
+ if (chunkRowId == _numDocsPerChunk - 1) {
+ // Last row in the last chunk
+ return _dataBuffer.size();
+ } else {
+ int valueEndOffsetInChunk = _dataBuffer
+ .getInt(chunkStartOffset + (chunkRowId + 1) * ROW_OFFSET_SIZE);
+ if (valueEndOffsetInChunk == 0) {
+ // Last row in the last chunk (chunk is incomplete, which stores 0 as the offset for the absent rows)
+ return _dataBuffer.size();
+ } else {
+ return chunkStartOffset + valueEndOffsetInChunk;
+ }
+ }
+ } else {
+ if (chunkRowId == _numDocsPerChunk - 1) {
+ // Last row in the chunk
+ return getChunkPosition(chunkId + 1);
+ } else {
+ return chunkStartOffset + _dataBuffer
+ .getInt(chunkStartOffset + (chunkRowId + 1) * ROW_OFFSET_SIZE);
+ }
+ }
+ }
+}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
new file mode 100644
index 0000000..373c3a9
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
@@ -0,0 +1,81 @@
+package org.apache.pinot.segment.local.segment.index.creator;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
+import org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader.ChunkReaderContext;
+import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
+import org.apache.pinot.segment.spi.V1Constants.Indexes;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class MultiValueVarByteRawIndexCreatorTest {
+
+ private static final String OUTPUT_DIR =
+ System.getProperty("java.io.tmpdir") + File.separator + "mvVarRawTest";
+
+ @BeforeClass
+ public void setup() throws Exception {
+ FileUtils.forceMkdir(new File(OUTPUT_DIR));
+ }
+
+ /**
+ * Clean up after test
+ */
+ @AfterClass
+ public void cleanup() {
+ FileUtils.deleteQuietly(new File(OUTPUT_DIR));
+ }
+
+ @Test
+ public void testMV() throws IOException {
+ String column = "testCol";
+ int numDocs = 1000;
+ int maxElements = 50;
+ int maxTotalLength = 500;
+ File file = new File(OUTPUT_DIR, column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
+ file.delete();
+ MultiValueVarByteRawIndexCreator creator = new MultiValueVarByteRawIndexCreator(
+ new File(OUTPUT_DIR), ChunkCompressionType.SNAPPY, column, numDocs, DataType.STRING,
+ maxTotalLength, maxElements);
+ List<String[]> inputs = new ArrayList<>();
+ Random random = new Random();
+ for (int i = 0; i < numDocs; i++) {
+ //int length = 1;
+ int length = random.nextInt(10);
+ String[] values = new String[length];
+ for (int j = 0; j < length; j++) {
+ char[] value = new char[length];
+ Arrays.fill(value, 'a');
+ values[j] = new String(value);
+ }
+ inputs.add(values);
+ creator.putStringMV(values);
+ }
+ creator.close();
+
+ //read
+ final PinotDataBuffer buffer = PinotDataBuffer
+ .mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, "");
+ VarByteChunkMVForwardIndexReader reader = new VarByteChunkMVForwardIndexReader(buffer,
+ DataType.STRING);
+ final ChunkReaderContext context = reader.createContext();
+ String[] values = new String[maxElements];
+ for (int i = 0; i < numDocs; i++) {
+ int length = reader.getStringMV(i, values, context);
+ String[] readValue = Arrays.copyOf(values, length);
+ Assert.assertEquals(inputs.get(i), readValue);
+ }
+ }
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
index ac7b704..e037544 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
@@ -39,6 +39,7 @@ public class V1Constants {
public static final String UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION = ".sv.unsorted.fwd";
public static final String SORTED_SV_FORWARD_INDEX_FILE_EXTENSION = ".sv.sorted.fwd";
public static final String RAW_SV_FORWARD_INDEX_FILE_EXTENSION = ".sv.raw.fwd";
+ public static final String RAW_MV_FORWARD_INDEX_FILE_EXTENSION = ".mv.raw.fwd";
public static final String UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION = ".mv.fwd";
public static final String BITMAP_INVERTED_INDEX_FILE_EXTENSION = ".bitmap.inv";
public static final String BITMAP_RANGE_INDEX_FILE_EXTENSION = ".bitmap.range";
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/ForwardIndexCreator.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/ForwardIndexCreator.java
index dee4db1..e5a21e9 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/ForwardIndexCreator.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/ForwardIndexCreator.java
@@ -173,4 +173,13 @@ public interface ForwardIndexCreator extends Closeable {
default void putStringMV(String[] values) {
throw new UnsupportedOperationException();
}
+
+ /**
+ * Writes the next byte[] type multi-value into the forward index.
+ *
+ * @param values Values to write
+ */
+ default void putBytesMV(byte[][] values) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
index fb92bec..6393aaf 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
@@ -242,4 +242,23 @@ public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends
default int getStringMV(int docId, String[] valueBuffer, T context) {
throw new UnsupportedOperationException();
}
+
+ /**
+ * Reads the bytes type multi-value at the given document id into the passed in value buffer (the buffer size must
+ * be enough to hold all the values for the multi-value entry) and returns the number of values within the multi-value
+ * entry.
+ *
+ * @param docId Document id
+ * @param valueBuffer Value buffer
+ * @param context Reader context
+ * @return Number of values within the multi-value entry
+ */
+ default int getBytesMV(int docId, byte[][] valueBuffer, T context) {
+ throw new UnsupportedOperationException();
+ }
+
+ default int getFloatMV(int docId, float[] valueBuffer, T context, int[] parentIndices) {
+ throw new UnsupportedOperationException();
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org