You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/10/25 23:25:28 UTC
[pinot] branch master updated: implement
FixedByteChunkMVForwardIndexReader, address post-review comments (#7629)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d4d6be4 implement FixedByteChunkMVForwardIndexReader, address post-review comments (#7629)
d4d6be4 is described below
commit d4d6be4e4cc9c84a8da9722b001e8f70ba3781ad
Author: Richard Startin <ri...@startree.ai>
AuthorDate: Tue Oct 26 00:25:13 2021 +0100
implement FixedByteChunkMVForwardIndexReader, address post-review comments (#7629)
---
.../creator/impl/SegmentColumnarIndexCreator.java | 42 ++---
.../fwd/MultiValueFixedByteRawIndexCreator.java | 32 ++--
.../impl/fwd/MultiValueVarByteRawIndexCreator.java | 31 ++--
.../index/column/PhysicalColumnIndexContainer.java | 13 +-
.../FixedByteChunkMVForwardIndexReader.java | 178 +++++++++++++++++++++
.../MultiValueFixedByteRawIndexCreatorTest.java | 178 +++++++++++++++++++++
.../MultiValueVarByteRawIndexCreatorTest.java | 6 +-
.../spi/index/reader/ForwardIndexReader.java | 4 -
8 files changed, 404 insertions(+), 80 deletions(-)
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
index dcc0ea2..0ea31d5 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -23,7 +23,6 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import java.io.File;
import java.io.IOException;
-import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -82,6 +81,7 @@ import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.*;
import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Segment.*;
@@ -476,23 +476,16 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
value = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE;
}
if (forwardIndexCreator.getValueType().getStoredType() == DataType.STRING) {
- value = String.valueOf(value);
- int length = ((String[]) columnValueToIndex).length;
- columnValueToIndex = new String[length];
- Arrays.fill((String[]) columnValueToIndex, value);
+ columnValueToIndex = new String[] {String.valueOf(value)};
} else if (forwardIndexCreator.getValueType().getStoredType() == DataType.BYTES) {
- int length = ((byte[][]) columnValueToIndex).length;
- columnValueToIndex = new byte[length][];
- Arrays.fill((byte[][]) columnValueToIndex, String.valueOf(value).getBytes());
+ columnValueToIndex = new byte[][] {String.valueOf(value).getBytes(UTF_8)};
} else {
throw new RuntimeException("Text Index is only supported for STRING and BYTES stored type");
}
}
switch (forwardIndexCreator.getValueType()) {
case INT:
- if (columnValueToIndex instanceof int[]) {
- forwardIndexCreator.putIntMV((int[]) columnValueToIndex);
- } else if (columnValueToIndex instanceof Object[]) {
+ if (columnValueToIndex instanceof Object[]) {
int[] array = new int[((Object[]) columnValueToIndex).length];
for (int i = 0; i < array.length; i++) {
array[i] = (Integer) ((Object[]) columnValueToIndex)[i];
@@ -501,9 +494,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
}
break;
case LONG:
- if (columnValueToIndex instanceof long[]) {
- forwardIndexCreator.putLongMV((long[]) columnValueToIndex);
- } else if (columnValueToIndex instanceof Object[]) {
+ if (columnValueToIndex instanceof Object[]) {
long[] array = new long[((Object[]) columnValueToIndex).length];
for (int i = 0; i < array.length; i++) {
array[i] = (Long) ((Object[]) columnValueToIndex)[i];
@@ -512,9 +503,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
}
break;
case FLOAT:
- if (columnValueToIndex instanceof float[]) {
- forwardIndexCreator.putFloatMV((float[]) columnValueToIndex);
- } else if (columnValueToIndex instanceof Object[]) {
+ if (columnValueToIndex instanceof Object[]) {
float[] array = new float[((Object[]) columnValueToIndex).length];
for (int i = 0; i < array.length; i++) {
array[i] = (Float) ((Object[]) columnValueToIndex)[i];
@@ -523,9 +512,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
}
break;
case DOUBLE:
- if (columnValueToIndex instanceof double[]) {
- forwardIndexCreator.putDoubleMV((double[]) columnValueToIndex);
- } else if (columnValueToIndex instanceof Object[]) {
+ if (columnValueToIndex instanceof Object[]) {
double[] array = new double[((Object[]) columnValueToIndex).length];
for (int i = 0; i < array.length; i++) {
array[i] = (Double) ((Object[]) columnValueToIndex)[i];
@@ -835,10 +822,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
* @param writerVersion version to use for the raw index writer
* @return raw index creator
*/
- public static ForwardIndexCreator getRawIndexCreatorForSVColumn(File file,
- ChunkCompressionType compressionType,
- String column, DataType dataType, int totalDocs, int lengthOfLongestEntry,
- boolean deriveNumDocsPerChunk,
+ public static ForwardIndexCreator getRawIndexCreatorForSVColumn(File file, ChunkCompressionType compressionType,
+ String column, DataType dataType, int totalDocs, int lengthOfLongestEntry, boolean deriveNumDocsPerChunk,
int writerVersion)
throws IOException {
switch (dataType.getStoredType()) {
@@ -871,9 +856,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
* @return raw index creator
*/
public static ForwardIndexCreator getRawIndexCreatorForMVColumn(File file, ChunkCompressionType compressionType,
- String column, DataType dataType, final int totalDocs,
- final int maxNumberOfMultiValueElements, boolean deriveNumDocsPerChunk, int writerVersion,
- int maxRowLengthInBytes)
+ String column, DataType dataType, final int totalDocs, int maxNumberOfMultiValueElements,
+ boolean deriveNumDocsPerChunk, int writerVersion, int maxRowLengthInBytes)
throws IOException {
switch (dataType.getStoredType()) {
case INT:
@@ -881,11 +865,11 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
case FLOAT:
case DOUBLE:
return new MultiValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs, dataType,
- dataType.getStoredType().size(), maxNumberOfMultiValueElements, deriveNumDocsPerChunk, writerVersion);
+ maxNumberOfMultiValueElements, deriveNumDocsPerChunk, writerVersion);
case STRING:
case BYTES:
return new MultiValueVarByteRawIndexCreator(file, compressionType, column, totalDocs, dataType, writerVersion,
- maxRowLengthInBytes);
+ maxRowLengthInBytes, maxNumberOfMultiValueElements);
default:
throw new UnsupportedOperationException(
"Data type not supported for raw indexing: " + dataType);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
index 572c793..c43f8b7 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
@@ -18,11 +18,9 @@
*/
package org.apache.pinot.segment.local.segment.creator.impl.fwd;
-import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
import org.apache.pinot.segment.spi.V1Constants.Indexes;
@@ -53,12 +51,10 @@ public class MultiValueFixedByteRawIndexCreator implements ForwardIndexCreator {
* @param totalDocs Total number of documents to index
* @param valueType Type of the values
*/
- public MultiValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType,
- String column,
- int totalDocs, DataType valueType, final int maxLengthOfEachEntry,
- final int maxNumberOfMultiValueElements)
+ public MultiValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, String column,
+ int totalDocs, DataType valueType, int maxNumberOfMultiValueElements)
throws IOException {
- this(baseIndexDir, compressionType, column, totalDocs, valueType, maxLengthOfEachEntry,
+ this(baseIndexDir, compressionType, column, totalDocs, valueType,
maxNumberOfMultiValueElements, false,
BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
}
@@ -71,33 +67,23 @@ public class MultiValueFixedByteRawIndexCreator implements ForwardIndexCreator {
* @param column Name of column to index
* @param totalDocs Total number of documents to index
* @param valueType Type of the values
- * @param maxLengthOfEachEntry length of longest entry (in bytes)
* @param deriveNumDocsPerChunk true if writer should auto-derive the number of rows per chunk
* @param writerVersion writer format version
*/
public MultiValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType,
- String column, int totalDocs, DataType valueType, final int maxLengthOfEachEntry,
- final int maxNumberOfMultiValueElements, boolean deriveNumDocsPerChunk,
- int writerVersion)
+ String column, int totalDocs, DataType valueType, int maxNumberOfMultiValueElements,
+ boolean deriveNumDocsPerChunk, int writerVersion)
throws IOException {
- File file = new File(baseIndexDir,
- column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
- FileUtils.deleteQuietly(file);
- int totalMaxLength = maxNumberOfMultiValueElements * maxLengthOfEachEntry;
+ File file = new File(baseIndexDir, column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
+ int totalMaxLength = maxNumberOfMultiValueElements * valueType.getStoredType().size();
int numDocsPerChunk =
- deriveNumDocsPerChunk ? getNumDocsPerChunk(totalMaxLength) : DEFAULT_NUM_DOCS_PER_CHUNK;
+ deriveNumDocsPerChunk ? Math.max(TARGET_MAX_CHUNK_SIZE / (totalMaxLength
+ + VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE), 1) : DEFAULT_NUM_DOCS_PER_CHUNK;
_indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs,
numDocsPerChunk, totalMaxLength, writerVersion);
_valueType = valueType;
}
- @VisibleForTesting
- public static int getNumDocsPerChunk(int lengthOfLongestEntry) {
- int overheadPerEntry =
- lengthOfLongestEntry + VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
- return Math.max(TARGET_MAX_CHUNK_SIZE / overheadPerEntry, 1);
- }
-
@Override
public boolean isDictionaryEncoded() {
return false;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
index 5d5b3cf..abfc436 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
@@ -49,13 +49,13 @@ public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator {
* @param totalDocs Total number of documents to index
* @param valueType Type of the values
* @param maxRowLengthInBytes the length in bytes of the largest row
+ * @param maxNumberOfElements the maximum number of elements in a row
*/
- public MultiValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType,
- String column,
- int totalDocs, DataType valueType, int maxRowLengthInBytes)
+ public MultiValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, String column,
+ int totalDocs, DataType valueType, int maxRowLengthInBytes, int maxNumberOfElements)
throws IOException {
this(baseIndexDir, compressionType, column, totalDocs, valueType,
- BaseChunkSVForwardIndexWriter.DEFAULT_VERSION, maxRowLengthInBytes);
+ BaseChunkSVForwardIndexWriter.DEFAULT_VERSION, maxRowLengthInBytes, maxNumberOfElements);
}
/**
@@ -67,28 +67,25 @@ public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator {
* @param totalDocs Total number of documents to index
* @param valueType Type of the values
* @param maxRowLengthInBytes the size in bytes of the largest row, the chunk size cannot be smaller than this
+ * @param maxNumberOfElements the maximum number of elements in a row
* @param writerVersion writer format version
*/
- public MultiValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType,
- String column, int totalDocs, DataType valueType, int writerVersion, int maxRowLengthInBytes)
+ public MultiValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, String column,
+ int totalDocs, DataType valueType, int writerVersion, int maxRowLengthInBytes, int maxNumberOfElements)
throws IOException {
//we will prepend the actual content with numElements and length array containing length of each element
- int totalMaxLength = Integer.BYTES + Math.max(maxRowLengthInBytes, TARGET_MAX_CHUNK_SIZE);
+ int maxLengthPrefixes = Integer.BYTES * maxNumberOfElements;
+ int totalMaxLength = Integer.BYTES + maxRowLengthInBytes + maxLengthPrefixes;
File file = new File(baseIndexDir,
column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
- int numDocsPerChunk = getNumDocsPerChunk(totalMaxLength);
- _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs,
- numDocsPerChunk, totalMaxLength,
- writerVersion);
+ int numDocsPerChunk = Math.max(
+ TARGET_MAX_CHUNK_SIZE / (totalMaxLength + VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE),
+ 1);
+ _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs, numDocsPerChunk,
+ totalMaxLength, writerVersion);
_valueType = valueType;
}
- private static int getNumDocsPerChunk(int lengthOfLongestEntry) {
- int overheadPerEntry =
- lengthOfLongestEntry + VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
- return Math.max(TARGET_MAX_CHUNK_SIZE / overheadPerEntry, 1);
- }
-
@Override
public boolean isDictionaryEncoded() {
return false;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
index 9ab1e7d..5d8657b 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
@@ -45,7 +45,9 @@ import org.apache.pinot.segment.local.segment.index.readers.StringDictionary;
import org.apache.pinot.segment.local.segment.index.readers.bloom.BloomFilterReaderFactory;
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2;
+import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
+import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.geospatial.ImmutableH3IndexReader;
import org.apache.pinot.segment.local.segment.index.readers.json.ImmutableJsonIndexReader;
@@ -194,7 +196,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
}
} else {
// Raw index
- _forwardIndex = loadRawForwardIndex(fwdIndexBuffer, metadata.getDataType());
+ _forwardIndex = loadRawForwardIndex(fwdIndexBuffer, metadata.getDataType(), metadata.isSingleValue());
_dictionary = null;
_rangeIndex = null;
_invertedIndex = null;
@@ -294,17 +296,20 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
}
}
- private static ForwardIndexReader<?> loadRawForwardIndex(PinotDataBuffer forwardIndexBuffer, DataType dataType) {
+ private static ForwardIndexReader<?> loadRawForwardIndex(PinotDataBuffer forwardIndexBuffer, DataType dataType,
+ boolean isSingleValue) {
DataType storedType = dataType.getStoredType();
switch (storedType) {
case INT:
case LONG:
case FLOAT:
case DOUBLE:
- return new FixedByteChunkSVForwardIndexReader(forwardIndexBuffer, storedType);
+ return isSingleValue ? new FixedByteChunkSVForwardIndexReader(forwardIndexBuffer, storedType)
+ : new FixedByteChunkMVForwardIndexReader(forwardIndexBuffer, storedType);
case STRING:
case BYTES:
- return new VarByteChunkSVForwardIndexReader(forwardIndexBuffer, storedType);
+ return isSingleValue ? new VarByteChunkSVForwardIndexReader(forwardIndexBuffer, storedType)
+ : new VarByteChunkMVForwardIndexReader(forwardIndexBuffer, storedType);
default:
throw new IllegalStateException("Illegal data type for raw forward index: " + dataType);
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java
new file mode 100644
index 0000000..b2e745d1
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.readers.forward;
+
+import java.nio.ByteBuffer;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * Chunk-based multi-value raw (non-dictionary-encoded) forward index reader for values of
+ * fixed length data type (INT, LONG, FLOAT, DOUBLE).
+ * <p>For data layout, please refer to the documentation for {@link VarByteChunkSVForwardIndexWriter}
+ */
+public final class FixedByteChunkMVForwardIndexReader extends BaseChunkSVForwardIndexReader {
+
+ private static final int ROW_OFFSET_SIZE = VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+
+ private final int _maxChunkSize;
+
+ public FixedByteChunkMVForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType) {
+ super(dataBuffer, valueType);
+ _maxChunkSize = _numDocsPerChunk * (ROW_OFFSET_SIZE + _lengthOfLongestEntry);
+ }
+
+ @Nullable
+ @Override
+ public ChunkReaderContext createContext() {
+ if (_isCompressed) {
+ return new ChunkReaderContext(_maxChunkSize);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public int getIntMV(int docId, int[] valueBuffer, ChunkReaderContext context) {
+ ByteBuffer byteBuffer = slice(docId, context);
+ int numValues = byteBuffer.getInt();
+ for (int i = 0; i < numValues; i++) {
+ valueBuffer[i] = byteBuffer.getInt();
+ }
+ return numValues;
+ }
+
+ @Override
+ public int getLongMV(int docId, long[] valueBuffer, ChunkReaderContext context) {
+ ByteBuffer byteBuffer = slice(docId, context);
+ int numValues = byteBuffer.getInt();
+ for (int i = 0; i < numValues; i++) {
+ valueBuffer[i] = byteBuffer.getLong();
+ }
+ return numValues;
+ }
+
+ @Override
+ public int getFloatMV(int docId, float[] valueBuffer, ChunkReaderContext context) {
+ ByteBuffer byteBuffer = slice(docId, context);
+ int numValues = byteBuffer.getInt();
+ for (int i = 0; i < numValues; i++) {
+ valueBuffer[i] = byteBuffer.getFloat();
+ }
+ return numValues;
+ }
+
+ @Override
+ public int getDoubleMV(int docId, double[] valueBuffer, ChunkReaderContext context) {
+ ByteBuffer byteBuffer = slice(docId, context);
+ int numValues = byteBuffer.getInt();
+ for (int i = 0; i < numValues; i++) {
+ valueBuffer[i] = byteBuffer.getDouble();
+ }
+ return numValues;
+ }
+
+ private ByteBuffer slice(int docId, ChunkReaderContext context) {
+ if (_isCompressed) {
+ return sliceBytesCompressed(docId, context);
+ } else {
+ return sliceBytesUncompressed(docId);
+ }
+ }
+
+ /**
+ * Helper method to read BYTES value from the compressed index.
+ */
+ private ByteBuffer sliceBytesCompressed(int docId, ChunkReaderContext context) {
+ int chunkRowId = docId % _numDocsPerChunk;
+ ByteBuffer chunkBuffer = getChunkBuffer(docId, context);
+
+ // These offsets are offset in the chunk buffer
+ int valueStartOffset = chunkBuffer.getInt(chunkRowId * ROW_OFFSET_SIZE);
+ int valueEndOffset = getValueEndOffset(chunkRowId, chunkBuffer);
+ // cast only for JDK8 compilation profile
+ return (ByteBuffer) chunkBuffer.duplicate().position(valueStartOffset).limit(valueEndOffset);
+ }
+
+ /**
+ * Helper method to read BYTES value from the uncompressed index.
+ */
+ private ByteBuffer sliceBytesUncompressed(int docId) {
+ int chunkId = docId / _numDocsPerChunk;
+ int chunkRowId = docId % _numDocsPerChunk;
+
+ // These offsets are offset in the data buffer
+ long chunkStartOffset = getChunkPosition(chunkId);
+ long valueStartOffset =
+ chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + (long) chunkRowId * ROW_OFFSET_SIZE);
+ long valueEndOffset = getValueEndOffset(chunkId, chunkRowId, chunkStartOffset);
+ return _dataBuffer.toDirectByteBuffer(valueStartOffset, (int) (valueEndOffset - valueStartOffset));
+ }
+
+ /**
+ * Helper method to compute the end offset of the value in the chunk buffer.
+ */
+ private int getValueEndOffset(int rowId, ByteBuffer chunkBuffer) {
+ if (rowId == _numDocsPerChunk - 1) {
+ // Last row in the chunk
+ return chunkBuffer.limit();
+ } else {
+ int valueEndOffset = chunkBuffer.getInt((rowId + 1) * ROW_OFFSET_SIZE);
+ if (valueEndOffset == 0) {
+ // Last row in the last chunk (chunk is incomplete, which stores 0 as the offset for the absent rows)
+ return chunkBuffer.limit();
+ } else {
+ return valueEndOffset;
+ }
+ }
+ }
+
+ /**
+ * Helper method to compute the end offset of the value in the data buffer.
+ */
+ private long getValueEndOffset(int chunkId, int chunkRowId, long chunkStartOffset) {
+ if (chunkId == _numChunks - 1) {
+ // Last chunk
+ if (chunkRowId == _numDocsPerChunk - 1) {
+ // Last row in the last chunk
+ return _dataBuffer.size();
+ } else {
+ int valueEndOffsetInChunk = _dataBuffer
+ .getInt(chunkStartOffset + (long) (chunkRowId + 1) * ROW_OFFSET_SIZE);
+ if (valueEndOffsetInChunk == 0) {
+ // Last row in the last chunk (chunk is incomplete, which stores 0 as the offset for the absent rows)
+ return _dataBuffer.size();
+ } else {
+ return chunkStartOffset + valueEndOffsetInChunk;
+ }
+ }
+ } else {
+ if (chunkRowId == _numDocsPerChunk - 1) {
+ // Last row in the chunk
+ return getChunkPosition(chunkId + 1);
+ } else {
+ return chunkStartOffset + _dataBuffer
+ .getInt(chunkStartOffset + (long) (chunkRowId + 1) * ROW_OFFSET_SIZE);
+ }
+ }
+ }
+}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
new file mode 100644
index 0000000..99b4c3f
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.creator;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.function.IntFunction;
+import java.util.function.ToIntFunction;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator;
+import org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader.ChunkReaderContext;
+import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader;
+import org.apache.pinot.segment.spi.V1Constants.Indexes;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class MultiValueFixedByteRawIndexCreatorTest {
+
+ private static final String OUTPUT_DIR =
+ System.getProperty("java.io.tmpdir") + File.separator + "mvFixedRawTest";
+
+ private static final Random RANDOM = new Random();
+
+ @BeforeClass
+ public void setup() throws Exception {
+ FileUtils.forceMkdir(new File(OUTPUT_DIR));
+ }
+
+ /**
+ * Clean up after test
+ */
+ @AfterClass
+ public void cleanup() {
+ FileUtils.deleteQuietly(new File(OUTPUT_DIR));
+ }
+
+ @Test
+ public void testMVInt() throws IOException {
+ testMV(DataType.INT, ints(), x -> x.length, int[]::new, MultiValueFixedByteRawIndexCreator::putIntMV,
+ (reader, context, docId, buffer) -> {
+ int length = reader.getIntMV(docId, buffer, context);
+ return Arrays.copyOf(buffer, length);
+ });
+ }
+
+ @Test
+ public void testMVLong() throws IOException {
+ testMV(DataType.LONG, longs(), x -> x.length, long[]::new, MultiValueFixedByteRawIndexCreator::putLongMV,
+ (reader, context, docId, buffer) -> {
+ int length = reader.getLongMV(docId, buffer, context);
+ return Arrays.copyOf(buffer, length);
+ });
+ }
+
+ @Test
+ public void testMVFloat() throws IOException {
+ testMV(DataType.FLOAT, floats(), x -> x.length, float[]::new, MultiValueFixedByteRawIndexCreator::putFloatMV,
+ (reader, context, docId, buffer) -> {
+ int length = reader.getFloatMV(docId, buffer, context);
+ return Arrays.copyOf(buffer, length);
+ });
+ }
+
+ @Test
+ public void testMVDouble() throws IOException {
+ testMV(DataType.DOUBLE, doubles(), x -> x.length, double[]::new, MultiValueFixedByteRawIndexCreator::putDoubleMV,
+ (reader, context, docId, buffer) -> {
+ int length = reader.getDoubleMV(docId, buffer, context);
+ return Arrays.copyOf(buffer, length);
+ });
+ }
+
+
+ public <T> void testMV(DataType dataType, List<T> inputs, ToIntFunction<T> sizeof, IntFunction<T> constructor,
+ Injector<T> injector, Extractor<T> extractor)
+ throws IOException {
+ String column = "testCol_" + dataType;
+ int numDocs = inputs.size();
+ int maxElements = inputs.stream().mapToInt(sizeof).max().orElseThrow(RuntimeException::new);
+ File file = new File(OUTPUT_DIR, column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
+ file.delete();
+ MultiValueFixedByteRawIndexCreator creator = new MultiValueFixedByteRawIndexCreator(new File(OUTPUT_DIR),
+ ChunkCompressionType.SNAPPY, column, numDocs, dataType, maxElements);
+ inputs.forEach(input -> injector.inject(creator, input));
+ creator.close();
+
+ //read
+ final PinotDataBuffer buffer = PinotDataBuffer
+ .mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, "");
+ FixedByteChunkMVForwardIndexReader reader = new FixedByteChunkMVForwardIndexReader(buffer, DataType.BYTES);
+ final ChunkReaderContext context = reader.createContext();
+ T valueBuffer = constructor.apply(maxElements);
+ for (int i = 0; i < numDocs; i++) {
+ Assert.assertEquals(inputs.get(i), extractor.extract(reader, context, i, valueBuffer));
+ }
+ }
+
+ interface Extractor<T> {
+ T extract(FixedByteChunkMVForwardIndexReader reader, ChunkReaderContext context, int offset, T buffer);
+ }
+
+ interface Injector<T> {
+ void inject(MultiValueFixedByteRawIndexCreator creator, T input);
+ }
+
+ private static List<int[]> ints() {
+ return IntStream.range(0, 1000)
+ .mapToObj(i -> new int[RANDOM.nextInt(50)])
+ .peek(array -> {
+ for (int i = 0; i < array.length; i++) {
+ array[i] = RANDOM.nextInt();
+ }
+ })
+ .collect(Collectors.toList());
+ }
+
+ private static List<long[]> longs() {
+ return IntStream.range(0, 1000)
+ .mapToObj(i -> new long[RANDOM.nextInt(50)])
+ .peek(array -> {
+ for (int i = 0; i < array.length; i++) {
+ array[i] = RANDOM.nextLong();
+ }
+ })
+ .collect(Collectors.toList());
+ }
+
+ private static List<float[]> floats() {
+ return IntStream.range(0, 1000)
+ .mapToObj(i -> new float[RANDOM.nextInt(50)])
+ .peek(array -> {
+ for (int i = 0; i < array.length; i++) {
+ array[i] = RANDOM.nextFloat();
+ }
+ })
+ .collect(Collectors.toList());
+ }
+
+ private static List<double[]> doubles() {
+ return IntStream.range(0, 1000)
+ .mapToObj(i -> new double[RANDOM.nextInt(50)])
+ .peek(array -> {
+ for (int i = 0; i < array.length; i++) {
+ array[i] = RANDOM.nextDouble();
+ }
+ })
+ .collect(Collectors.toList());
+ }
+
+}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
index a1f6e2c..c496e91 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
@@ -64,8 +64,8 @@ public class MultiValueVarByteRawIndexCreatorTest {
int maxTotalLength = 500;
File file = new File(OUTPUT_DIR, column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
file.delete();
- MultiValueVarByteRawIndexCreator creator = new MultiValueVarByteRawIndexCreator(
- new File(OUTPUT_DIR), ChunkCompressionType.SNAPPY, column, numDocs, DataType.STRING, maxTotalLength);
+ MultiValueVarByteRawIndexCreator creator = new MultiValueVarByteRawIndexCreator(new File(OUTPUT_DIR),
+ ChunkCompressionType.SNAPPY, column, numDocs, DataType.STRING, maxTotalLength, maxElements);
List<String[]> inputs = new ArrayList<>();
Random random = new Random();
for (int i = 0; i < numDocs; i++) {
@@ -106,7 +106,7 @@ public class MultiValueVarByteRawIndexCreatorTest {
file.delete();
MultiValueVarByteRawIndexCreator creator = new MultiValueVarByteRawIndexCreator(
new File(OUTPUT_DIR), ChunkCompressionType.SNAPPY, column, numDocs, DataType.BYTES,
- maxTotalLength);
+ maxTotalLength, maxElements);
List<byte[][]> inputs = new ArrayList<>();
Random random = new Random();
for (int i = 0; i < numDocs; i++) {
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
index 6393aaf..941522c 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
@@ -257,8 +257,4 @@ public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends
throw new UnsupportedOperationException();
}
- default int getFloatMV(int docId, float[] valueBuffer, T context, int[] parentIndices) {
- throw new UnsupportedOperationException();
- }
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org