You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ki...@apache.org on 2021/10/17 07:19:57 UTC

[pinot] 01/01: Initial code for MultiValue forward Index

This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch mv-fwd-index
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 10b8d0ac8c8a4e911c5355335ed1e182d0c38543
Author: kishoreg <g....@gmail.com>
AuthorDate: Sun Oct 17 00:19:04 2021 -0700

    Initial code for MultiValue forward Index
---
 .../fwd/MultiValueFixedByteRawIndexCreator.java    | 179 +++++++++++++++++
 .../impl/fwd/MultiValueVarByteRawIndexCreator.java | 214 +++++++++++++++++++++
 .../stats/AbstractColumnStatisticsCollector.java   |   5 +
 .../forward/VarByteChunkMVForwardIndexReader.java  | 197 +++++++++++++++++++
 .../MultiValueVarByteRawIndexCreatorTest.java      |  81 ++++++++
 .../org/apache/pinot/segment/spi/V1Constants.java  |   1 +
 .../spi/index/creator/ForwardIndexCreator.java     |   9 +
 .../spi/index/reader/ForwardIndexReader.java       |  19 ++
 8 files changed, 705 insertions(+)

diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
new file mode 100644
index 0000000..d608a65
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.fwd;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+/**
+ * Forward index creator for raw (non-dictionary-encoded) single-value column of variable length
+ * data type (STRING,
+ * BYTES).
+ */
+public class MultiValueFixedByteRawIndexCreator implements ForwardIndexCreator {
+
+  private static final int DEFAULT_NUM_DOCS_PER_CHUNK = 1000;
+  private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024;
+
+  private final VarByteChunkSVForwardIndexWriter _indexWriter;
+  private final DataType _valueType;
+
+  /**
+   * Create a var-byte raw index creator for the given column
+   *
+   * @param baseIndexDir Index directory
+   * @param compressionType Type of compression to use
+   * @param column Name of column to index
+   * @param totalDocs Total number of documents to index
+   * @param valueType Type of the values
+   * @param maxLength length of longest entry (in bytes)
+   */
+  public MultiValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType,
+      String column,
+      int totalDocs, DataType valueType, int maxLength)
+      throws IOException {
+    this(baseIndexDir, compressionType, column, totalDocs, valueType, maxLength, false,
+        BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+  }
+
+  /**
+   * Create a var-byte raw index creator for the given column
+   *
+   * @param baseIndexDir Index directory
+   * @param compressionType Type of compression to use
+   * @param column Name of column to index
+   * @param totalDocs Total number of documents to index
+   * @param valueType Type of the values
+   * @param maxLength length of longest entry (in bytes)
+   * @param deriveNumDocsPerChunk true if writer should auto-derive the number of rows per chunk
+   * @param writerVersion writer format version
+   */
+  public MultiValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType,
+      String column,
+      int totalDocs, DataType valueType, int maxLength, boolean deriveNumDocsPerChunk,
+      int writerVersion)
+      throws IOException {
+    File file = new File(baseIndexDir,
+        column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
+    FileUtils.deleteQuietly(file);
+    int numDocsPerChunk =
+        deriveNumDocsPerChunk ? getNumDocsPerChunk(maxLength) : DEFAULT_NUM_DOCS_PER_CHUNK;
+    _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs,
+        numDocsPerChunk, maxLength,
+        writerVersion);
+    _valueType = valueType;
+  }
+
+  @VisibleForTesting
+  public static int getNumDocsPerChunk(int lengthOfLongestEntry) {
+    int overheadPerEntry =
+        lengthOfLongestEntry + VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+    return Math.max(TARGET_MAX_CHUNK_SIZE / overheadPerEntry, 1);
+  }
+
+  @Override
+  public boolean isDictionaryEncoded() {
+    return false;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return false;
+  }
+
+  @Override
+  public DataType getValueType() {
+    return _valueType;
+  }
+
+  @Override
+  public void putIntMV(final int[] values) {
+
+    byte[] bytes = new byte[Integer.BYTES
+        + values.length * Integer.BYTES];//numValues, bytes required to store the content
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    //write the length
+    byteBuffer.putInt(values.length);
+    //write the content of each element
+    for (final int value : values) {
+      byteBuffer.putInt(value);
+    }
+    _indexWriter.putBytes(bytes);
+  }
+
+  @Override
+  public void putLongMV(final long[] values) {
+
+    byte[] bytes = new byte[Integer.BYTES
+        + values.length * Long.BYTES];//numValues, bytes required to store the content
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    //write the length
+    byteBuffer.putInt(values.length);
+    //write the content of each element
+    for (final long value : values) {
+      byteBuffer.putLong(value);
+    }
+    _indexWriter.putBytes(bytes);
+  }
+
+  @Override
+  public void putFloatMV(final float[] values) {
+
+    byte[] bytes = new byte[Integer.BYTES
+        + values.length * Float.BYTES];//numValues, bytes required to store the content
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    //write the length
+    byteBuffer.putInt(values.length);
+    //write the content of each element
+    for (final float value : values) {
+      byteBuffer.putFloat(value);
+    }
+    _indexWriter.putBytes(bytes);
+  }
+
+  @Override
+  public void putDoubleMV(final double[] values) {
+
+    byte[] bytes = new byte[Integer.BYTES
+        + values.length * Long.BYTES];//numValues, bytes required to store the content
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    //write the length
+    byteBuffer.putInt(values.length);
+    //write the content of each element
+    for (final double value : values) {
+      byteBuffer.putDouble(value);
+    }
+    _indexWriter.putBytes(bytes);
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    _indexWriter.close();
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
new file mode 100644
index 0000000..465b5f7
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.fwd;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Random;
+import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader.ChunkReaderContext;
+import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
+import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.V1Constants.Indexes;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+/**
+ * Forward index creator for raw (non-dictionary-encoded) single-value column of variable length
+ * data type (STRING,
+ * BYTES).
+ */
+public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator {
+
+  private static final int DEFAULT_NUM_DOCS_PER_CHUNK = 1000;
+  private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024;
+
+  private final VarByteChunkSVForwardIndexWriter _indexWriter;
+  private final DataType _valueType;
+
+  /**
+   * Create a var-byte raw index creator for the given column
+   *
+   * @param baseIndexDir Index directory
+   * @param compressionType Type of compression to use
+   * @param column Name of column to index
+   * @param totalDocs Total number of documents to index
+   * @param valueType Type of the values
+   * @param maxTotalContentLength max total content length
+   * @param maxElements max number of elements
+   */
+  public MultiValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType,
+      String column,
+      int totalDocs, DataType valueType, int maxTotalContentLength, int maxElements)
+      throws IOException {
+    this(baseIndexDir, compressionType, column, totalDocs, valueType, false, maxTotalContentLength,
+        maxElements,
+        BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+  }
+
+  /**
+   * Create a var-byte raw index creator for the given column
+   *
+   * @param baseIndexDir Index directory
+   * @param compressionType Type of compression to use
+   * @param column Name of column to index
+   * @param totalDocs Total number of documents to index
+   * @param valueType Type of the values
+   * @param deriveNumDocsPerChunk true if writer should auto-derive the number of rows per chunk
+   * @param maxTotalContentLength max total content length
+   * @param maxElements max number of elements
+   * @param writerVersion writer format version
+   */
+  public MultiValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType,
+      String column,
+      int totalDocs, DataType valueType, boolean deriveNumDocsPerChunk, int maxTotalContentLength,
+      int maxElements,
+      int writerVersion)
+      throws IOException {
+    //we will prepend the actual content with numElements and length array containing length of each element
+    int maxLength = Integer.BYTES + maxElements * Integer.BYTES + maxTotalContentLength;
+    File file = new File(baseIndexDir,
+        column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
+    int numDocsPerChunk =
+        deriveNumDocsPerChunk ? getNumDocsPerChunk(maxLength) : DEFAULT_NUM_DOCS_PER_CHUNK;
+    _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs,
+        numDocsPerChunk, maxLength,
+        writerVersion);
+    _valueType = valueType;
+  }
+
+  @VisibleForTesting
+  public static int getNumDocsPerChunk(int lengthOfLongestEntry) {
+    int overheadPerEntry =
+        lengthOfLongestEntry + VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+    return Math.max(TARGET_MAX_CHUNK_SIZE / overheadPerEntry, 1);
+  }
+
+  @Override
+  public boolean isDictionaryEncoded() {
+    return false;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return false;
+  }
+
+  @Override
+  public DataType getValueType() {
+    return _valueType;
+  }
+
+  @Override
+  public void putStringMV(final String[] values) {
+    int totalBytes = 0;
+    for (int i = 0; i < values.length; i++) {
+      final String value = values[i];
+      int length = value.getBytes().length;
+      totalBytes += length;
+    }
+    byte[] bytes = new byte[Integer.BYTES + Integer.BYTES * values.length
+        + totalBytes];//numValues, length array, concatenated bytes
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    //write the length
+    byteBuffer.putInt(values.length);
+    //write the length of each element
+    for (final String value : values) {
+      byteBuffer.putInt(value.getBytes().length);
+    }
+    //write the content of each element
+    //todo:maybe there is a smart way to avoid 3 loops but at the cost of allocating more memory upfront and resize as needed
+    for (final String value : values) {
+      byteBuffer.put(value.getBytes());
+    }
+//    System.out.println("Inserting bytes of length:" + bytes.length);
+    _indexWriter.putBytes(bytes);
+  }
+
+  @Override
+  public void putBytesMV(final byte[][] values) {
+    int totalBytes = 0;
+    for (int i = 0; i < values.length; i++) {
+      int length = values[i].length;
+      totalBytes += length;
+    }
+    byte[] bytes = new byte[Integer.BYTES + Integer.BYTES * values.length
+        + totalBytes];//numValues, length array, concatenated bytes
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    //write the length
+    byteBuffer.putInt(values.length);
+    //write the length of each element
+    for (final byte[] value : values) {
+      byteBuffer.putInt(value.length);
+    }
+    //write the content of each element
+    //todo:maybe there is a smart way to avoid 3 loops but at the cost of allocating more memory upfront and resize as needed
+    for (final byte[] value : values) {
+      byteBuffer.put(value);
+    }
+//    System.out.println("Inserting bytes of length:" + bytes.length);
+    _indexWriter.putBytes(bytes);
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    _indexWriter.close();
+  }
+
+  private static void testSV() throws IOException {
+    final File dir = new File(System.getProperty("java.io.tmpdir"));
+
+    String column = "testCol";
+    int numDocs = 10000;
+    int maxLength = 100;
+    File file = new File(dir, column + Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
+    file.delete();
+    SingleValueVarByteRawIndexCreator creator = new SingleValueVarByteRawIndexCreator(dir,
+        ChunkCompressionType.SNAPPY, column, numDocs, DataType.STRING, maxLength, true,
+        BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+    Random random = new Random();
+    for (int i = 0; i < numDocs; i++) {
+      int length = random.nextInt(maxLength);
+      char[] value = new char[length];
+      Arrays.fill(value, 'a');
+      creator.putString(new String(value));
+    }
+    creator.close();
+
+    //read
+    final PinotDataBuffer buffer = PinotDataBuffer
+        .mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, "");
+    VarByteChunkSVForwardIndexReader reader = new VarByteChunkSVForwardIndexReader(buffer,
+        DataType.STRING);
+    final ChunkReaderContext context = reader.createContext();
+    for (int i = 0; i < numDocs; i++) {
+      String value = reader.getString(i, context);
+      System.out.println("value = " + value);
+    }
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
index 284bf69..6407b55 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
@@ -47,6 +47,7 @@ public abstract class AbstractColumnStatisticsCollector implements ColumnStatist
 
   protected int _totalNumberOfEntries = 0;
   protected int _maxNumberOfMultiValues = 0;
+  protected int _maxLengthOfMultiValues = 0;
   private PartitionFunction _partitionFunction;
   private final int _numPartitions;
   private final Set<Integer> _partitions;
@@ -72,6 +73,10 @@ public abstract class AbstractColumnStatisticsCollector implements ColumnStatist
     return _maxNumberOfMultiValues;
   }
 
+  public int getMaxLengthOfMultiValues() {
+    return _maxLengthOfMultiValues;
+  }
+
   void addressSorted(Object entry) {
     if (_isSorted) {
       if (_previousValue != null) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java
new file mode 100644
index 0000000..eef396c
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.readers.forward;
+
+import java.nio.ByteBuffer;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+/**
+ * Chunk-based single-value raw (non-dictionary-encoded) forward index reader for values of
+ * variable
+ * length data type
+ * (STRING, BYTES).
+ * <p>For data layout, please refer to the documentation for {@link VarByteChunkSVForwardIndexWriter}
+ */
+public final class VarByteChunkMVForwardIndexReader extends BaseChunkSVForwardIndexReader {
+
+  private static final int ROW_OFFSET_SIZE = VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+
+  private final int _maxChunkSize;
+
+  // Thread local (reusable) byte[] to read bytes from data file.
+  private final ThreadLocal<byte[]> _reusableBytes = ThreadLocal
+      .withInitial(() -> new byte[_lengthOfLongestEntry]);
+
+  public VarByteChunkMVForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType) {
+    super(dataBuffer, valueType);
+    _maxChunkSize = _numDocsPerChunk * (ROW_OFFSET_SIZE + _lengthOfLongestEntry);
+  }
+
+  @Nullable
+  @Override
+  public ChunkReaderContext createContext() {
+    if (_isCompressed) {
+      return new ChunkReaderContext(_maxChunkSize);
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public int getStringMV(final int docId, final String[] valueBuffer,
+      final ChunkReaderContext context) {
+    byte[] compressedBytes;
+    if (_isCompressed) {
+      compressedBytes = getBytesCompressed(docId, context);
+    } else {
+      compressedBytes = getBytesUncompressed(docId);
+    }
+    ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes);
+    int numValues = byteBuffer.getInt();
+    int contentOffset = (numValues + 1) * Integer.BYTES;
+    for (int i = 0; i < numValues; i++) {
+      int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
+      byte[] bytes = new byte[length];
+      byteBuffer.position(contentOffset);
+      byteBuffer.get(bytes, 0, length);
+      valueBuffer[i] = new String(bytes);
+      contentOffset += length;
+    }
+    return numValues;
+  }
+
+  @Override
+  public int getBytesMV(final int docId, final byte[][] valueBuffer,
+      final ChunkReaderContext context) {
+    byte[] compressedBytes;
+    if (_isCompressed) {
+      compressedBytes = getBytesCompressed(docId, context);
+    } else {
+      compressedBytes = getBytesUncompressed(docId);
+    }
+    ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes);
+    int numValues = byteBuffer.getInt();
+    int contentOffset = (numValues + 1) * Integer.BYTES;
+    for (int i = 0; i < numValues; i++) {
+      int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
+      byte[] bytes = new byte[length];
+      byteBuffer.position(contentOffset);
+      byteBuffer.get(bytes, 0, length);
+      valueBuffer[i] = bytes;
+      contentOffset += length;
+    }
+    return numValues;
+  }
+
+  @Override
+  public byte[] getBytes(int docId, ChunkReaderContext context) {
+    if (_isCompressed) {
+      return getBytesCompressed(docId, context);
+    } else {
+      return getBytesUncompressed(docId);
+    }
+  }
+
+  /**
+   * Helper method to read BYTES value from the compressed index.
+   */
+  private byte[] getBytesCompressed(int docId, ChunkReaderContext context) {
+    int chunkRowId = docId % _numDocsPerChunk;
+    ByteBuffer chunkBuffer = getChunkBuffer(docId, context);
+
+    // These offsets are offset in the chunk buffer
+    int valueStartOffset = chunkBuffer.getInt(chunkRowId * ROW_OFFSET_SIZE);
+    int valueEndOffset = getValueEndOffset(chunkRowId, chunkBuffer);
+
+    byte[] bytes = new byte[valueEndOffset - valueStartOffset];
+    chunkBuffer.position(valueStartOffset);
+    chunkBuffer.get(bytes);
+    return bytes;
+  }
+
+  /**
+   * Helper method to read BYTES value from the uncompressed index.
+   */
+  private byte[] getBytesUncompressed(int docId) {
+    int chunkId = docId / _numDocsPerChunk;
+    int chunkRowId = docId % _numDocsPerChunk;
+
+    // These offsets are offset in the data buffer
+    long chunkStartOffset = getChunkPosition(chunkId);
+    long valueStartOffset =
+        chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + chunkRowId * ROW_OFFSET_SIZE);
+    long valueEndOffset = getValueEndOffset(chunkId, chunkRowId, chunkStartOffset);
+
+    byte[] bytes = new byte[(int) (valueEndOffset - valueStartOffset)];
+    _dataBuffer.copyTo(valueStartOffset, bytes);
+    return bytes;
+  }
+
+  /**
+   * Helper method to compute the end offset of the value in the chunk buffer.
+   */
+  private int getValueEndOffset(int rowId, ByteBuffer chunkBuffer) {
+    if (rowId == _numDocsPerChunk - 1) {
+      // Last row in the chunk
+      return chunkBuffer.limit();
+    } else {
+      int valueEndOffset = chunkBuffer.getInt((rowId + 1) * ROW_OFFSET_SIZE);
+      if (valueEndOffset == 0) {
+        // Last row in the last chunk (chunk is incomplete, which stores 0 as the offset for the absent rows)
+        return chunkBuffer.limit();
+      } else {
+        return valueEndOffset;
+      }
+    }
+  }
+
+  /**
+   * Helper method to compute the end offset of the value in the data buffer.
+   */
+  private long getValueEndOffset(int chunkId, int chunkRowId, long chunkStartOffset) {
+    if (chunkId == _numChunks - 1) {
+      // Last chunk
+      if (chunkRowId == _numDocsPerChunk - 1) {
+        // Last row in the last chunk
+        return _dataBuffer.size();
+      } else {
+        int valueEndOffsetInChunk = _dataBuffer
+            .getInt(chunkStartOffset + (chunkRowId + 1) * ROW_OFFSET_SIZE);
+        if (valueEndOffsetInChunk == 0) {
+          // Last row in the last chunk (chunk is incomplete, which stores 0 as the offset for the absent rows)
+          return _dataBuffer.size();
+        } else {
+          return chunkStartOffset + valueEndOffsetInChunk;
+        }
+      }
+    } else {
+      if (chunkRowId == _numDocsPerChunk - 1) {
+        // Last row in the chunk
+        return getChunkPosition(chunkId + 1);
+      } else {
+        return chunkStartOffset + _dataBuffer
+            .getInt(chunkStartOffset + (chunkRowId + 1) * ROW_OFFSET_SIZE);
+      }
+    }
+  }
+}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
new file mode 100644
index 0000000..373c3a9
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
@@ -0,0 +1,81 @@
+package org.apache.pinot.segment.local.segment.index.creator;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
+import org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader.ChunkReaderContext;
+import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
+import org.apache.pinot.segment.spi.V1Constants.Indexes;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class MultiValueVarByteRawIndexCreatorTest {
+
+  private static final String OUTPUT_DIR =
+      System.getProperty("java.io.tmpdir") + File.separator + "mvVarRawTest";
+
+  @BeforeClass
+  public void setup() throws Exception {
+    FileUtils.forceMkdir(new File(OUTPUT_DIR));
+  }
+
+  /**
+   * Clean up after test
+   */
+  @AfterClass
+  public void cleanup() {
+    FileUtils.deleteQuietly(new File(OUTPUT_DIR));
+  }
+
+  @Test
+  public void testMV() throws IOException {
+    String column = "testCol";
+    int numDocs = 1000;
+    int maxElements = 50;
+    int maxTotalLength = 500;
+    File file = new File(OUTPUT_DIR, column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
+    file.delete();
+    MultiValueVarByteRawIndexCreator creator = new MultiValueVarByteRawIndexCreator(
+        new File(OUTPUT_DIR), ChunkCompressionType.SNAPPY, column, numDocs, DataType.STRING,
+        maxTotalLength, maxElements);
+    List<String[]> inputs = new ArrayList<>();
+    Random random = new Random();
+    for (int i = 0; i < numDocs; i++) {
+      //int length = 1;
+      int length = random.nextInt(10);
+      String[] values = new String[length];
+      for (int j = 0; j < length; j++) {
+        char[] value = new char[length];
+        Arrays.fill(value, 'a');
+        values[j] = new String(value);
+      }
+      inputs.add(values);
+      creator.putStringMV(values);
+    }
+    creator.close();
+
+    //read
+    final PinotDataBuffer buffer = PinotDataBuffer
+        .mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, "");
+    VarByteChunkMVForwardIndexReader reader = new VarByteChunkMVForwardIndexReader(buffer,
+        DataType.STRING);
+    final ChunkReaderContext context = reader.createContext();
+    String[] values = new String[maxElements];
+    for (int i = 0; i < numDocs; i++) {
+      int length = reader.getStringMV(i, values, context);
+      String[] readValue = Arrays.copyOf(values, length);
+      Assert.assertEquals(inputs.get(i), readValue);
+    }
+  }
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
index ac7b704..e037544 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
@@ -39,6 +39,7 @@ public class V1Constants {
     public static final String UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION = ".sv.unsorted.fwd";
     public static final String SORTED_SV_FORWARD_INDEX_FILE_EXTENSION = ".sv.sorted.fwd";
     public static final String RAW_SV_FORWARD_INDEX_FILE_EXTENSION = ".sv.raw.fwd";
+    public static final String RAW_MV_FORWARD_INDEX_FILE_EXTENSION = ".mv.raw.fwd";
     public static final String UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION = ".mv.fwd";
     public static final String BITMAP_INVERTED_INDEX_FILE_EXTENSION = ".bitmap.inv";
     public static final String BITMAP_RANGE_INDEX_FILE_EXTENSION = ".bitmap.range";
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/ForwardIndexCreator.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/ForwardIndexCreator.java
index dee4db1..e5a21e9 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/ForwardIndexCreator.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/ForwardIndexCreator.java
@@ -173,4 +173,13 @@ public interface ForwardIndexCreator extends Closeable {
   default void putStringMV(String[] values) {
     throw new UnsupportedOperationException();
   }
+
+  /**
+   * Writes the next byte[] type multi-value into the forward index.
+   *
+   * @param values Values to write
+   */
+  default void putBytesMV(byte[][] values) {
+    throw new UnsupportedOperationException();
+  }
 }
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
index fb92bec..6393aaf 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
@@ -242,4 +242,23 @@ public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends
   default int getStringMV(int docId, String[] valueBuffer, T context) {
     throw new UnsupportedOperationException();
   }
+
+  /**
+   * Reads the bytes type multi-value at the given document id into the passed in value buffer (the buffer size must
+   * be enough to hold all the values for the multi-value entry) and returns the number of values within the multi-value
+   * entry.
+   *
+   * @param docId Document id
+   * @param valueBuffer Value buffer
+   * @param context Reader context
+   * @return Number of values within the multi-value entry
+   */
+  default int getBytesMV(int docId, byte[][] valueBuffer, T context) {
+    throw new UnsupportedOperationException();
+  }
+
+  default int getFloatMV(int docId, float[] valueBuffer, T context, int[] parentIndices) {
+    throw new UnsupportedOperationException();
+  }
+
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org