You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ki...@apache.org on 2021/10/17 07:19:56 UTC

[pinot] branch mv-fwd-index created (now 10b8d0a)

This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a change to branch mv-fwd-index
in repository https://gitbox.apache.org/repos/asf/pinot.git.


      at 10b8d0a  Initial code for MultiValue forward Index

This branch includes the following new commits:

     new 10b8d0a  Initial code for MultiValue forward Index

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[pinot] 01/01: Initial code for MultiValue forward Index

Posted by ki...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch mv-fwd-index
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 10b8d0ac8c8a4e911c5355335ed1e182d0c38543
Author: kishoreg <g....@gmail.com>
AuthorDate: Sun Oct 17 00:19:04 2021 -0700

    Initial code for MultiValue forward Index
---
 .../fwd/MultiValueFixedByteRawIndexCreator.java    | 179 +++++++++++++++++
 .../impl/fwd/MultiValueVarByteRawIndexCreator.java | 214 +++++++++++++++++++++
 .../stats/AbstractColumnStatisticsCollector.java   |   5 +
 .../forward/VarByteChunkMVForwardIndexReader.java  | 197 +++++++++++++++++++
 .../MultiValueVarByteRawIndexCreatorTest.java      |  81 ++++++++
 .../org/apache/pinot/segment/spi/V1Constants.java  |   1 +
 .../spi/index/creator/ForwardIndexCreator.java     |   9 +
 .../spi/index/reader/ForwardIndexReader.java       |  19 ++
 8 files changed, 705 insertions(+)

diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
new file mode 100644
index 0000000..d608a65
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.fwd;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+/**
+ * Forward index creator for raw (non-dictionary-encoded) single-value column of variable length
+ * data type (STRING,
+ * BYTES).
+ */
+public class MultiValueFixedByteRawIndexCreator implements ForwardIndexCreator {
+
+  private static final int DEFAULT_NUM_DOCS_PER_CHUNK = 1000;
+  private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024;
+
+  private final VarByteChunkSVForwardIndexWriter _indexWriter;
+  private final DataType _valueType;
+
+  /**
+   * Create a var-byte raw index creator for the given column
+   *
+   * @param baseIndexDir Index directory
+   * @param compressionType Type of compression to use
+   * @param column Name of column to index
+   * @param totalDocs Total number of documents to index
+   * @param valueType Type of the values
+   * @param maxLength length of longest entry (in bytes)
+   */
+  public MultiValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType,
+      String column,
+      int totalDocs, DataType valueType, int maxLength)
+      throws IOException {
+    this(baseIndexDir, compressionType, column, totalDocs, valueType, maxLength, false,
+        BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+  }
+
+  /**
+   * Create a var-byte raw index creator for the given column
+   *
+   * @param baseIndexDir Index directory
+   * @param compressionType Type of compression to use
+   * @param column Name of column to index
+   * @param totalDocs Total number of documents to index
+   * @param valueType Type of the values
+   * @param maxLength length of longest entry (in bytes)
+   * @param deriveNumDocsPerChunk true if writer should auto-derive the number of rows per chunk
+   * @param writerVersion writer format version
+   */
+  public MultiValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType,
+      String column,
+      int totalDocs, DataType valueType, int maxLength, boolean deriveNumDocsPerChunk,
+      int writerVersion)
+      throws IOException {
+    File file = new File(baseIndexDir,
+        column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
+    FileUtils.deleteQuietly(file);
+    int numDocsPerChunk =
+        deriveNumDocsPerChunk ? getNumDocsPerChunk(maxLength) : DEFAULT_NUM_DOCS_PER_CHUNK;
+    _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs,
+        numDocsPerChunk, maxLength,
+        writerVersion);
+    _valueType = valueType;
+  }
+
+  @VisibleForTesting
+  public static int getNumDocsPerChunk(int lengthOfLongestEntry) {
+    int overheadPerEntry =
+        lengthOfLongestEntry + VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+    return Math.max(TARGET_MAX_CHUNK_SIZE / overheadPerEntry, 1);
+  }
+
+  @Override
+  public boolean isDictionaryEncoded() {
+    return false;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return false;
+  }
+
+  @Override
+  public DataType getValueType() {
+    return _valueType;
+  }
+
+  @Override
+  public void putIntMV(final int[] values) {
+
+    byte[] bytes = new byte[Integer.BYTES
+        + values.length * Integer.BYTES];//numValues, bytes required to store the content
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    //write the length
+    byteBuffer.putInt(values.length);
+    //write the content of each element
+    for (final int value : values) {
+      byteBuffer.putInt(value);
+    }
+    _indexWriter.putBytes(bytes);
+  }
+
+  @Override
+  public void putLongMV(final long[] values) {
+
+    byte[] bytes = new byte[Integer.BYTES
+        + values.length * Long.BYTES];//numValues, bytes required to store the content
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    //write the length
+    byteBuffer.putInt(values.length);
+    //write the content of each element
+    for (final long value : values) {
+      byteBuffer.putLong(value);
+    }
+    _indexWriter.putBytes(bytes);
+  }
+
+  @Override
+  public void putFloatMV(final float[] values) {
+
+    byte[] bytes = new byte[Integer.BYTES
+        + values.length * Float.BYTES];//numValues, bytes required to store the content
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    //write the length
+    byteBuffer.putInt(values.length);
+    //write the content of each element
+    for (final float value : values) {
+      byteBuffer.putFloat(value);
+    }
+    _indexWriter.putBytes(bytes);
+  }
+
+  @Override
+  public void putDoubleMV(final double[] values) {
+
+    byte[] bytes = new byte[Integer.BYTES
+        + values.length * Long.BYTES];//numValues, bytes required to store the content
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    //write the length
+    byteBuffer.putInt(values.length);
+    //write the content of each element
+    for (final double value : values) {
+      byteBuffer.putDouble(value);
+    }
+    _indexWriter.putBytes(bytes);
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    _indexWriter.close();
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
new file mode 100644
index 0000000..465b5f7
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.fwd;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Random;
+import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader.ChunkReaderContext;
+import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
+import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.V1Constants.Indexes;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+/**
+ * Forward index creator for raw (non-dictionary-encoded) single-value column of variable length
+ * data type (STRING,
+ * BYTES).
+ */
+public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator {
+
+  private static final int DEFAULT_NUM_DOCS_PER_CHUNK = 1000;
+  private static final int TARGET_MAX_CHUNK_SIZE = 1024 * 1024;
+
+  private final VarByteChunkSVForwardIndexWriter _indexWriter;
+  private final DataType _valueType;
+
+  /**
+   * Create a var-byte raw index creator for the given column
+   *
+   * @param baseIndexDir Index directory
+   * @param compressionType Type of compression to use
+   * @param column Name of column to index
+   * @param totalDocs Total number of documents to index
+   * @param valueType Type of the values
+   * @param maxTotalContentLength max total content length
+   * @param maxElements max number of elements
+   */
+  public MultiValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType,
+      String column,
+      int totalDocs, DataType valueType, int maxTotalContentLength, int maxElements)
+      throws IOException {
+    this(baseIndexDir, compressionType, column, totalDocs, valueType, false, maxTotalContentLength,
+        maxElements,
+        BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+  }
+
+  /**
+   * Create a var-byte raw index creator for the given column
+   *
+   * @param baseIndexDir Index directory
+   * @param compressionType Type of compression to use
+   * @param column Name of column to index
+   * @param totalDocs Total number of documents to index
+   * @param valueType Type of the values
+   * @param deriveNumDocsPerChunk true if writer should auto-derive the number of rows per chunk
+   * @param maxTotalContentLength max total content length
+   * @param maxElements max number of elements
+   * @param writerVersion writer format version
+   */
+  public MultiValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType,
+      String column,
+      int totalDocs, DataType valueType, boolean deriveNumDocsPerChunk, int maxTotalContentLength,
+      int maxElements,
+      int writerVersion)
+      throws IOException {
+    //we will prepend the actual content with numElements and length array containing length of each element
+    int maxLength = Integer.BYTES + maxElements * Integer.BYTES + maxTotalContentLength;
+    File file = new File(baseIndexDir,
+        column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
+    int numDocsPerChunk =
+        deriveNumDocsPerChunk ? getNumDocsPerChunk(maxLength) : DEFAULT_NUM_DOCS_PER_CHUNK;
+    _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs,
+        numDocsPerChunk, maxLength,
+        writerVersion);
+    _valueType = valueType;
+  }
+
+  @VisibleForTesting
+  public static int getNumDocsPerChunk(int lengthOfLongestEntry) {
+    int overheadPerEntry =
+        lengthOfLongestEntry + VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+    return Math.max(TARGET_MAX_CHUNK_SIZE / overheadPerEntry, 1);
+  }
+
+  @Override
+  public boolean isDictionaryEncoded() {
+    return false;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return false;
+  }
+
+  @Override
+  public DataType getValueType() {
+    return _valueType;
+  }
+
+  @Override
+  public void putStringMV(final String[] values) {
+    int totalBytes = 0;
+    for (int i = 0; i < values.length; i++) {
+      final String value = values[i];
+      int length = value.getBytes().length;
+      totalBytes += length;
+    }
+    byte[] bytes = new byte[Integer.BYTES + Integer.BYTES * values.length
+        + totalBytes];//numValues, length array, concatenated bytes
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    //write the length
+    byteBuffer.putInt(values.length);
+    //write the length of each element
+    for (final String value : values) {
+      byteBuffer.putInt(value.getBytes().length);
+    }
+    //write the content of each element
+    //todo:maybe there is a smart way to avoid 3 loops but at the cost of allocating more memory upfront and resize as needed
+    for (final String value : values) {
+      byteBuffer.put(value.getBytes());
+    }
+//    System.out.println("Inserting bytes of length:" + bytes.length);
+    _indexWriter.putBytes(bytes);
+  }
+
+  @Override
+  public void putBytesMV(final byte[][] values) {
+    int totalBytes = 0;
+    for (int i = 0; i < values.length; i++) {
+      int length = values[i].length;
+      totalBytes += length;
+    }
+    byte[] bytes = new byte[Integer.BYTES + Integer.BYTES * values.length
+        + totalBytes];//numValues, length array, concatenated bytes
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    //write the length
+    byteBuffer.putInt(values.length);
+    //write the length of each element
+    for (final byte[] value : values) {
+      byteBuffer.putInt(value.length);
+    }
+    //write the content of each element
+    //todo:maybe there is a smart way to avoid 3 loops but at the cost of allocating more memory upfront and resize as needed
+    for (final byte[] value : values) {
+      byteBuffer.put(value);
+    }
+//    System.out.println("Inserting bytes of length:" + bytes.length);
+    _indexWriter.putBytes(bytes);
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    _indexWriter.close();
+  }
+
+  private static void testSV() throws IOException {
+    final File dir = new File(System.getProperty("java.io.tmpdir"));
+
+    String column = "testCol";
+    int numDocs = 10000;
+    int maxLength = 100;
+    File file = new File(dir, column + Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
+    file.delete();
+    SingleValueVarByteRawIndexCreator creator = new SingleValueVarByteRawIndexCreator(dir,
+        ChunkCompressionType.SNAPPY, column, numDocs, DataType.STRING, maxLength, true,
+        BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+    Random random = new Random();
+    for (int i = 0; i < numDocs; i++) {
+      int length = random.nextInt(maxLength);
+      char[] value = new char[length];
+      Arrays.fill(value, 'a');
+      creator.putString(new String(value));
+    }
+    creator.close();
+
+    //read
+    final PinotDataBuffer buffer = PinotDataBuffer
+        .mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, "");
+    VarByteChunkSVForwardIndexReader reader = new VarByteChunkSVForwardIndexReader(buffer,
+        DataType.STRING);
+    final ChunkReaderContext context = reader.createContext();
+    for (int i = 0; i < numDocs; i++) {
+      String value = reader.getString(i, context);
+      System.out.println("value = " + value);
+    }
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
index 284bf69..6407b55 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
@@ -47,6 +47,7 @@ public abstract class AbstractColumnStatisticsCollector implements ColumnStatist
 
   protected int _totalNumberOfEntries = 0;
   protected int _maxNumberOfMultiValues = 0;
+  protected int _maxLengthOfMultiValues = 0;
   private PartitionFunction _partitionFunction;
   private final int _numPartitions;
   private final Set<Integer> _partitions;
@@ -72,6 +73,10 @@ public abstract class AbstractColumnStatisticsCollector implements ColumnStatist
     return _maxNumberOfMultiValues;
   }
 
+  public int getMaxLengthOfMultiValues() {
+    return _maxLengthOfMultiValues;
+  }
+
   void addressSorted(Object entry) {
     if (_isSorted) {
       if (_previousValue != null) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java
new file mode 100644
index 0000000..eef396c
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.readers.forward;
+
+import java.nio.ByteBuffer;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+/**
+ * Chunk-based single-value raw (non-dictionary-encoded) forward index reader for values of
+ * variable
+ * length data type
+ * (STRING, BYTES).
+ * <p>For data layout, please refer to the documentation for {@link VarByteChunkSVForwardIndexWriter}
+ */
+public final class VarByteChunkMVForwardIndexReader extends BaseChunkSVForwardIndexReader {
+
+  private static final int ROW_OFFSET_SIZE = VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+
+  private final int _maxChunkSize;
+
+  // Thread local (reusable) byte[] to read bytes from data file.
+  private final ThreadLocal<byte[]> _reusableBytes = ThreadLocal
+      .withInitial(() -> new byte[_lengthOfLongestEntry]);
+
+  public VarByteChunkMVForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType) {
+    super(dataBuffer, valueType);
+    _maxChunkSize = _numDocsPerChunk * (ROW_OFFSET_SIZE + _lengthOfLongestEntry);
+  }
+
+  @Nullable
+  @Override
+  public ChunkReaderContext createContext() {
+    if (_isCompressed) {
+      return new ChunkReaderContext(_maxChunkSize);
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public int getStringMV(final int docId, final String[] valueBuffer,
+      final ChunkReaderContext context) {
+    byte[] compressedBytes;
+    if (_isCompressed) {
+      compressedBytes = getBytesCompressed(docId, context);
+    } else {
+      compressedBytes = getBytesUncompressed(docId);
+    }
+    ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes);
+    int numValues = byteBuffer.getInt();
+    int contentOffset = (numValues + 1) * Integer.BYTES;
+    for (int i = 0; i < numValues; i++) {
+      int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
+      byte[] bytes = new byte[length];
+      byteBuffer.position(contentOffset);
+      byteBuffer.get(bytes, 0, length);
+      valueBuffer[i] = new String(bytes);
+      contentOffset += length;
+    }
+    return numValues;
+  }
+
+  @Override
+  public int getBytesMV(final int docId, final byte[][] valueBuffer,
+      final ChunkReaderContext context) {
+    byte[] compressedBytes;
+    if (_isCompressed) {
+      compressedBytes = getBytesCompressed(docId, context);
+    } else {
+      compressedBytes = getBytesUncompressed(docId);
+    }
+    ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes);
+    int numValues = byteBuffer.getInt();
+    int contentOffset = (numValues + 1) * Integer.BYTES;
+    for (int i = 0; i < numValues; i++) {
+      int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
+      byte[] bytes = new byte[length];
+      byteBuffer.position(contentOffset);
+      byteBuffer.get(bytes, 0, length);
+      valueBuffer[i] = bytes;
+      contentOffset += length;
+    }
+    return numValues;
+  }
+
+  @Override
+  public byte[] getBytes(int docId, ChunkReaderContext context) {
+    if (_isCompressed) {
+      return getBytesCompressed(docId, context);
+    } else {
+      return getBytesUncompressed(docId);
+    }
+  }
+
+  /**
+   * Helper method to read BYTES value from the compressed index.
+   */
+  private byte[] getBytesCompressed(int docId, ChunkReaderContext context) {
+    int chunkRowId = docId % _numDocsPerChunk;
+    ByteBuffer chunkBuffer = getChunkBuffer(docId, context);
+
+    // These offsets are offset in the chunk buffer
+    int valueStartOffset = chunkBuffer.getInt(chunkRowId * ROW_OFFSET_SIZE);
+    int valueEndOffset = getValueEndOffset(chunkRowId, chunkBuffer);
+
+    byte[] bytes = new byte[valueEndOffset - valueStartOffset];
+    chunkBuffer.position(valueStartOffset);
+    chunkBuffer.get(bytes);
+    return bytes;
+  }
+
+  /**
+   * Helper method to read BYTES value from the uncompressed index.
+   */
+  private byte[] getBytesUncompressed(int docId) {
+    int chunkId = docId / _numDocsPerChunk;
+    int chunkRowId = docId % _numDocsPerChunk;
+
+    // These offsets are offset in the data buffer
+    long chunkStartOffset = getChunkPosition(chunkId);
+    long valueStartOffset =
+        chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + chunkRowId * ROW_OFFSET_SIZE);
+    long valueEndOffset = getValueEndOffset(chunkId, chunkRowId, chunkStartOffset);
+
+    byte[] bytes = new byte[(int) (valueEndOffset - valueStartOffset)];
+    _dataBuffer.copyTo(valueStartOffset, bytes);
+    return bytes;
+  }
+
+  /**
+   * Helper method to compute the end offset of the value in the chunk buffer.
+   */
+  private int getValueEndOffset(int rowId, ByteBuffer chunkBuffer) {
+    if (rowId == _numDocsPerChunk - 1) {
+      // Last row in the chunk
+      return chunkBuffer.limit();
+    } else {
+      int valueEndOffset = chunkBuffer.getInt((rowId + 1) * ROW_OFFSET_SIZE);
+      if (valueEndOffset == 0) {
+        // Last row in the last chunk (chunk is incomplete, which stores 0 as the offset for the absent rows)
+        return chunkBuffer.limit();
+      } else {
+        return valueEndOffset;
+      }
+    }
+  }
+
+  /**
+   * Helper method to compute the end offset of the value in the data buffer.
+   */
+  private long getValueEndOffset(int chunkId, int chunkRowId, long chunkStartOffset) {
+    if (chunkId == _numChunks - 1) {
+      // Last chunk
+      if (chunkRowId == _numDocsPerChunk - 1) {
+        // Last row in the last chunk
+        return _dataBuffer.size();
+      } else {
+        int valueEndOffsetInChunk = _dataBuffer
+            .getInt(chunkStartOffset + (chunkRowId + 1) * ROW_OFFSET_SIZE);
+        if (valueEndOffsetInChunk == 0) {
+          // Last row in the last chunk (chunk is incomplete, which stores 0 as the offset for the absent rows)
+          return _dataBuffer.size();
+        } else {
+          return chunkStartOffset + valueEndOffsetInChunk;
+        }
+      }
+    } else {
+      if (chunkRowId == _numDocsPerChunk - 1) {
+        // Last row in the chunk
+        return getChunkPosition(chunkId + 1);
+      } else {
+        return chunkStartOffset + _dataBuffer
+            .getInt(chunkStartOffset + (chunkRowId + 1) * ROW_OFFSET_SIZE);
+      }
+    }
+  }
+}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
new file mode 100644
index 0000000..373c3a9
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
@@ -0,0 +1,81 @@
+package org.apache.pinot.segment.local.segment.index.creator;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
+import org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader.ChunkReaderContext;
+import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
+import org.apache.pinot.segment.spi.V1Constants.Indexes;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class MultiValueVarByteRawIndexCreatorTest {
+
+  private static final String OUTPUT_DIR =
+      System.getProperty("java.io.tmpdir") + File.separator + "mvVarRawTest";
+
+  @BeforeClass
+  public void setup() throws Exception {
+    FileUtils.forceMkdir(new File(OUTPUT_DIR));
+  }
+
+  /**
+   * Clean up after test
+   */
+  @AfterClass
+  public void cleanup() {
+    FileUtils.deleteQuietly(new File(OUTPUT_DIR));
+  }
+
+  @Test
+  public void testMV() throws IOException {
+    String column = "testCol";
+    int numDocs = 1000;
+    int maxElements = 50;
+    int maxTotalLength = 500;
+    File file = new File(OUTPUT_DIR, column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
+    file.delete();
+    MultiValueVarByteRawIndexCreator creator = new MultiValueVarByteRawIndexCreator(
+        new File(OUTPUT_DIR), ChunkCompressionType.SNAPPY, column, numDocs, DataType.STRING,
+        maxTotalLength, maxElements);
+    List<String[]> inputs = new ArrayList<>();
+    Random random = new Random();
+    for (int i = 0; i < numDocs; i++) {
+      //int length = 1;
+      int length = random.nextInt(10);
+      String[] values = new String[length];
+      for (int j = 0; j < length; j++) {
+        char[] value = new char[length];
+        Arrays.fill(value, 'a');
+        values[j] = new String(value);
+      }
+      inputs.add(values);
+      creator.putStringMV(values);
+    }
+    creator.close();
+
+    //read
+    final PinotDataBuffer buffer = PinotDataBuffer
+        .mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, "");
+    VarByteChunkMVForwardIndexReader reader = new VarByteChunkMVForwardIndexReader(buffer,
+        DataType.STRING);
+    final ChunkReaderContext context = reader.createContext();
+    String[] values = new String[maxElements];
+    for (int i = 0; i < numDocs; i++) {
+      int length = reader.getStringMV(i, values, context);
+      String[] readValue = Arrays.copyOf(values, length);
+      Assert.assertEquals(inputs.get(i), readValue);
+    }
+  }
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
index ac7b704..e037544 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java
@@ -39,6 +39,7 @@ public class V1Constants {
     public static final String UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION = ".sv.unsorted.fwd";
     public static final String SORTED_SV_FORWARD_INDEX_FILE_EXTENSION = ".sv.sorted.fwd";
     public static final String RAW_SV_FORWARD_INDEX_FILE_EXTENSION = ".sv.raw.fwd";
+    public static final String RAW_MV_FORWARD_INDEX_FILE_EXTENSION = ".mv.raw.fwd";
     public static final String UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION = ".mv.fwd";
     public static final String BITMAP_INVERTED_INDEX_FILE_EXTENSION = ".bitmap.inv";
     public static final String BITMAP_RANGE_INDEX_FILE_EXTENSION = ".bitmap.range";
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/ForwardIndexCreator.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/ForwardIndexCreator.java
index dee4db1..e5a21e9 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/ForwardIndexCreator.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/creator/ForwardIndexCreator.java
@@ -173,4 +173,13 @@ public interface ForwardIndexCreator extends Closeable {
   default void putStringMV(String[] values) {
     throw new UnsupportedOperationException();
   }
+
+  /**
+   * Writes the next byte[] type multi-value into the forward index.
+   *
+   * @param values Values to write
+   */
+  default void putBytesMV(byte[][] values) {
+    throw new UnsupportedOperationException();
+  }
 }
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
index fb92bec..6393aaf 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
@@ -242,4 +242,23 @@ public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends
   default int getStringMV(int docId, String[] valueBuffer, T context) {
     throw new UnsupportedOperationException();
   }
+
+  /**
+   * Reads the bytes type multi-value at the given document id into the passed in value buffer (the buffer size must
+   * be enough to hold all the values for the multi-value entry) and returns the number of values within the multi-value
+   * entry.
+   *
+   * @param docId Document id
+   * @param valueBuffer Value buffer
+   * @param context Reader context
+   * @return Number of values within the multi-value entry
+   */
+  default int getBytesMV(int docId, byte[][] valueBuffer, T context) {
+    throw new UnsupportedOperationException();
+  }
+
+  default int getFloatMV(int docId, float[] valueBuffer, T context, int[] parentIndices) {
+    throw new UnsupportedOperationException();
+  }
+
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org