You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/10/25 23:25:28 UTC

[pinot] branch master updated: implement FixedByteChunkMVForwardIndexReader, address post-review comments (#7629)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d4d6be4  implement FixedByteChunkMVForwardIndexReader, address post-review comments (#7629)
d4d6be4 is described below

commit d4d6be4e4cc9c84a8da9722b001e8f70ba3781ad
Author: Richard Startin <ri...@startree.ai>
AuthorDate: Tue Oct 26 00:25:13 2021 +0100

    implement FixedByteChunkMVForwardIndexReader, address post-review comments (#7629)
---
 .../creator/impl/SegmentColumnarIndexCreator.java  |  42 ++---
 .../fwd/MultiValueFixedByteRawIndexCreator.java    |  32 ++--
 .../impl/fwd/MultiValueVarByteRawIndexCreator.java |  31 ++--
 .../index/column/PhysicalColumnIndexContainer.java |  13 +-
 .../FixedByteChunkMVForwardIndexReader.java        | 178 +++++++++++++++++++++
 .../MultiValueFixedByteRawIndexCreatorTest.java    | 178 +++++++++++++++++++++
 .../MultiValueVarByteRawIndexCreatorTest.java      |   6 +-
 .../spi/index/reader/ForwardIndexReader.java       |   4 -
 8 files changed, 404 insertions(+), 80 deletions(-)

diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
index dcc0ea2..0ea31d5 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -23,7 +23,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import java.io.File;
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -82,6 +81,7 @@ import org.joda.time.format.DateTimeFormatter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.*;
 import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Segment.*;
 
@@ -476,23 +476,16 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
               value = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE;
             }
             if (forwardIndexCreator.getValueType().getStoredType() == DataType.STRING) {
-              value = String.valueOf(value);
-              int length = ((String[]) columnValueToIndex).length;
-              columnValueToIndex = new String[length];
-              Arrays.fill((String[]) columnValueToIndex, value);
+              columnValueToIndex = new String[] {String.valueOf(value)};
             } else if (forwardIndexCreator.getValueType().getStoredType() == DataType.BYTES) {
-              int length = ((byte[][]) columnValueToIndex).length;
-              columnValueToIndex = new byte[length][];
-              Arrays.fill((byte[][]) columnValueToIndex, String.valueOf(value).getBytes());
+              columnValueToIndex = new byte[][] {String.valueOf(value).getBytes(UTF_8)};
             } else {
               throw new RuntimeException("Text Index is only supported for STRING and BYTES stored type");
             }
           }
           switch (forwardIndexCreator.getValueType()) {
             case INT:
-              if (columnValueToIndex instanceof int[]) {
-                forwardIndexCreator.putIntMV((int[]) columnValueToIndex);
-              } else if (columnValueToIndex instanceof Object[]) {
+              if (columnValueToIndex instanceof Object[]) {
                 int[] array = new int[((Object[]) columnValueToIndex).length];
                 for (int i = 0; i < array.length; i++) {
                   array[i] = (Integer) ((Object[]) columnValueToIndex)[i];
@@ -501,9 +494,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
               }
               break;
             case LONG:
-              if (columnValueToIndex instanceof long[]) {
-                forwardIndexCreator.putLongMV((long[]) columnValueToIndex);
-              } else if (columnValueToIndex instanceof Object[]) {
+              if (columnValueToIndex instanceof Object[]) {
                 long[] array = new long[((Object[]) columnValueToIndex).length];
                 for (int i = 0; i < array.length; i++) {
                   array[i] = (Long) ((Object[]) columnValueToIndex)[i];
@@ -512,9 +503,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
               }
               break;
             case FLOAT:
-              if (columnValueToIndex instanceof float[]) {
-                forwardIndexCreator.putFloatMV((float[]) columnValueToIndex);
-              } else if (columnValueToIndex instanceof Object[]) {
+              if (columnValueToIndex instanceof Object[]) {
                 float[] array = new float[((Object[]) columnValueToIndex).length];
                 for (int i = 0; i < array.length; i++) {
                   array[i] = (Float) ((Object[]) columnValueToIndex)[i];
@@ -523,9 +512,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
               }
               break;
             case DOUBLE:
-              if (columnValueToIndex instanceof double[]) {
-                forwardIndexCreator.putDoubleMV((double[]) columnValueToIndex);
-              } else if (columnValueToIndex instanceof Object[]) {
+              if (columnValueToIndex instanceof Object[]) {
                 double[] array = new double[((Object[]) columnValueToIndex).length];
                 for (int i = 0; i < array.length; i++) {
                   array[i] = (Double) ((Object[]) columnValueToIndex)[i];
@@ -835,10 +822,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
    * @param writerVersion version to use for the raw index writer
    * @return raw index creator
    */
-  public static ForwardIndexCreator getRawIndexCreatorForSVColumn(File file,
-      ChunkCompressionType compressionType,
-      String column, DataType dataType, int totalDocs, int lengthOfLongestEntry,
-      boolean deriveNumDocsPerChunk,
+  public static ForwardIndexCreator getRawIndexCreatorForSVColumn(File file, ChunkCompressionType compressionType,
+      String column, DataType dataType, int totalDocs, int lengthOfLongestEntry, boolean deriveNumDocsPerChunk,
       int writerVersion)
       throws IOException {
     switch (dataType.getStoredType()) {
@@ -871,9 +856,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
    * @return raw index creator
    */
   public static ForwardIndexCreator getRawIndexCreatorForMVColumn(File file, ChunkCompressionType compressionType,
-      String column, DataType dataType, final int totalDocs,
-      final int maxNumberOfMultiValueElements, boolean deriveNumDocsPerChunk, int writerVersion,
-      int maxRowLengthInBytes)
+      String column, DataType dataType, final int totalDocs, int maxNumberOfMultiValueElements,
+      boolean deriveNumDocsPerChunk, int writerVersion, int maxRowLengthInBytes)
       throws IOException {
     switch (dataType.getStoredType()) {
       case INT:
@@ -881,11 +865,11 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
       case FLOAT:
       case DOUBLE:
         return new MultiValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs, dataType,
-            dataType.getStoredType().size(), maxNumberOfMultiValueElements, deriveNumDocsPerChunk, writerVersion);
+            maxNumberOfMultiValueElements, deriveNumDocsPerChunk, writerVersion);
       case STRING:
       case BYTES:
         return new MultiValueVarByteRawIndexCreator(file, compressionType, column, totalDocs, dataType, writerVersion,
-            maxRowLengthInBytes);
+            maxRowLengthInBytes, maxNumberOfMultiValueElements);
       default:
         throw new UnsupportedOperationException(
             "Data type not supported for raw indexing: " + dataType);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
index 572c793..c43f8b7 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
@@ -18,11 +18,9 @@
  */
 package org.apache.pinot.segment.local.segment.creator.impl.fwd;
 
-import com.google.common.annotations.VisibleForTesting;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import org.apache.commons.io.FileUtils;
 import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
 import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
 import org.apache.pinot.segment.spi.V1Constants.Indexes;
@@ -53,12 +51,10 @@ public class MultiValueFixedByteRawIndexCreator implements ForwardIndexCreator {
    * @param totalDocs Total number of documents to index
    * @param valueType Type of the values
    */
-  public MultiValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType,
-      String column,
-      int totalDocs, DataType valueType, final int maxLengthOfEachEntry,
-      final int maxNumberOfMultiValueElements)
+  public MultiValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, String column,
+      int totalDocs, DataType valueType, int maxNumberOfMultiValueElements)
       throws IOException {
-    this(baseIndexDir, compressionType, column, totalDocs, valueType, maxLengthOfEachEntry,
+    this(baseIndexDir, compressionType, column, totalDocs, valueType,
         maxNumberOfMultiValueElements, false,
         BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
   }
@@ -71,33 +67,23 @@ public class MultiValueFixedByteRawIndexCreator implements ForwardIndexCreator {
    * @param column Name of column to index
    * @param totalDocs Total number of documents to index
    * @param valueType Type of the values
-   * @param maxLengthOfEachEntry length of longest entry (in bytes)
    * @param deriveNumDocsPerChunk true if writer should auto-derive the number of rows per chunk
    * @param writerVersion writer format version
    */
   public MultiValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType,
-      String column, int totalDocs, DataType valueType, final int maxLengthOfEachEntry,
-      final int maxNumberOfMultiValueElements, boolean deriveNumDocsPerChunk,
-      int writerVersion)
+      String column, int totalDocs, DataType valueType, int maxNumberOfMultiValueElements,
+      boolean deriveNumDocsPerChunk, int writerVersion)
       throws IOException {
-    File file = new File(baseIndexDir,
-        column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
-    FileUtils.deleteQuietly(file);
-    int totalMaxLength = maxNumberOfMultiValueElements * maxLengthOfEachEntry;
+    File file = new File(baseIndexDir, column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
+    int totalMaxLength = maxNumberOfMultiValueElements * valueType.getStoredType().size();
     int numDocsPerChunk =
-        deriveNumDocsPerChunk ? getNumDocsPerChunk(totalMaxLength) : DEFAULT_NUM_DOCS_PER_CHUNK;
+        deriveNumDocsPerChunk ? Math.max(TARGET_MAX_CHUNK_SIZE / (totalMaxLength
+            + VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE), 1) : DEFAULT_NUM_DOCS_PER_CHUNK;
     _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs,
         numDocsPerChunk, totalMaxLength, writerVersion);
     _valueType = valueType;
   }
 
-  @VisibleForTesting
-  public static int getNumDocsPerChunk(int lengthOfLongestEntry) {
-    int overheadPerEntry =
-        lengthOfLongestEntry + VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
-    return Math.max(TARGET_MAX_CHUNK_SIZE / overheadPerEntry, 1);
-  }
-
   @Override
   public boolean isDictionaryEncoded() {
     return false;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
index 5d5b3cf..abfc436 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
@@ -49,13 +49,13 @@ public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator {
    * @param totalDocs Total number of documents to index
    * @param valueType Type of the values
    * @param maxRowLengthInBytes the length in bytes of the largest row
+   * @param maxNumberOfElements the maximum number of elements in a row
    */
-  public MultiValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType,
-      String column,
-      int totalDocs, DataType valueType, int maxRowLengthInBytes)
+  public MultiValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, String column,
+      int totalDocs, DataType valueType, int maxRowLengthInBytes, int maxNumberOfElements)
       throws IOException {
     this(baseIndexDir, compressionType, column, totalDocs, valueType,
-        BaseChunkSVForwardIndexWriter.DEFAULT_VERSION, maxRowLengthInBytes);
+        BaseChunkSVForwardIndexWriter.DEFAULT_VERSION, maxRowLengthInBytes, maxNumberOfElements);
   }
 
   /**
@@ -67,28 +67,25 @@ public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator {
    * @param totalDocs Total number of documents to index
    * @param valueType Type of the values
    * @param maxRowLengthInBytes the size in bytes of the largest row, the chunk size cannot be smaller than this
+   * @param maxNumberOfElements the maximum number of elements in a row
    * @param writerVersion writer format version
    */
-  public MultiValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType,
-      String column, int totalDocs, DataType valueType, int writerVersion, int maxRowLengthInBytes)
+  public MultiValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType, String column,
+      int totalDocs, DataType valueType, int writerVersion, int maxRowLengthInBytes, int maxNumberOfElements)
       throws IOException {
     //we will prepend the actual content with numElements and length array containing length of each element
-    int totalMaxLength = Integer.BYTES + Math.max(maxRowLengthInBytes, TARGET_MAX_CHUNK_SIZE);
+    int maxLengthPrefixes = Integer.BYTES * maxNumberOfElements;
+    int totalMaxLength = Integer.BYTES + maxRowLengthInBytes + maxLengthPrefixes;
     File file = new File(baseIndexDir,
         column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
-    int numDocsPerChunk = getNumDocsPerChunk(totalMaxLength);
-    _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs,
-        numDocsPerChunk, totalMaxLength,
-        writerVersion);
+    int numDocsPerChunk = Math.max(
+        TARGET_MAX_CHUNK_SIZE / (totalMaxLength + VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE),
+        1);
+    _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs, numDocsPerChunk,
+        totalMaxLength, writerVersion);
     _valueType = valueType;
   }
 
-  private static int getNumDocsPerChunk(int lengthOfLongestEntry) {
-    int overheadPerEntry =
-        lengthOfLongestEntry + VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
-    return Math.max(TARGET_MAX_CHUNK_SIZE / overheadPerEntry, 1);
-  }
-
   @Override
   public boolean isDictionaryEncoded() {
     return false;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
index 9ab1e7d..5d8657b 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
@@ -45,7 +45,9 @@ import org.apache.pinot.segment.local.segment.index.readers.StringDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.bloom.BloomFilterReaderFactory;
 import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVForwardIndexReader;
 import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2;
+import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader;
 import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
+import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
 import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
 import org.apache.pinot.segment.local.segment.index.readers.geospatial.ImmutableH3IndexReader;
 import org.apache.pinot.segment.local.segment.index.readers.json.ImmutableJsonIndexReader;
@@ -194,7 +196,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
       }
     } else {
       // Raw index
-      _forwardIndex = loadRawForwardIndex(fwdIndexBuffer, metadata.getDataType());
+      _forwardIndex = loadRawForwardIndex(fwdIndexBuffer, metadata.getDataType(), metadata.isSingleValue());
       _dictionary = null;
       _rangeIndex = null;
       _invertedIndex = null;
@@ -294,17 +296,20 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
     }
   }
 
-  private static ForwardIndexReader<?> loadRawForwardIndex(PinotDataBuffer forwardIndexBuffer, DataType dataType) {
+  private static ForwardIndexReader<?> loadRawForwardIndex(PinotDataBuffer forwardIndexBuffer, DataType dataType,
+      boolean isSingleValue) {
     DataType storedType = dataType.getStoredType();
     switch (storedType) {
       case INT:
       case LONG:
       case FLOAT:
       case DOUBLE:
-        return new FixedByteChunkSVForwardIndexReader(forwardIndexBuffer, storedType);
+        return isSingleValue ? new FixedByteChunkSVForwardIndexReader(forwardIndexBuffer, storedType)
+            : new FixedByteChunkMVForwardIndexReader(forwardIndexBuffer, storedType);
       case STRING:
       case BYTES:
-        return new VarByteChunkSVForwardIndexReader(forwardIndexBuffer, storedType);
+        return isSingleValue ? new VarByteChunkSVForwardIndexReader(forwardIndexBuffer, storedType)
+            : new VarByteChunkMVForwardIndexReader(forwardIndexBuffer, storedType);
       default:
         throw new IllegalStateException("Illegal data type for raw forward index: " + dataType);
     }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java
new file mode 100644
index 0000000..b2e745d1
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.readers.forward;
+
+import java.nio.ByteBuffer;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * Chunk-based multi-value raw (non-dictionary-encoded) forward index reader for values of
+ * fixed length data type (INT, LONG, FLOAT, DOUBLE).
+ * <p>For data layout, please refer to the documentation for {@link VarByteChunkSVForwardIndexWriter}
+ */
+public final class FixedByteChunkMVForwardIndexReader extends BaseChunkSVForwardIndexReader {
+
+  private static final int ROW_OFFSET_SIZE = VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
+
+  private final int _maxChunkSize;
+
+  public FixedByteChunkMVForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType) {
+    super(dataBuffer, valueType);
+    _maxChunkSize = _numDocsPerChunk * (ROW_OFFSET_SIZE + _lengthOfLongestEntry);
+  }
+
+  @Nullable
+  @Override
+  public ChunkReaderContext createContext() {
+    if (_isCompressed) {
+      return new ChunkReaderContext(_maxChunkSize);
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public int getIntMV(int docId, int[] valueBuffer, ChunkReaderContext context) {
+    ByteBuffer byteBuffer = slice(docId, context);
+    int numValues = byteBuffer.getInt();
+    for (int i = 0; i < numValues; i++) {
+      valueBuffer[i] = byteBuffer.getInt();
+    }
+    return numValues;
+  }
+
+  @Override
+  public int getLongMV(int docId, long[] valueBuffer, ChunkReaderContext context) {
+    ByteBuffer byteBuffer = slice(docId, context);
+    int numValues = byteBuffer.getInt();
+    for (int i = 0; i < numValues; i++) {
+      valueBuffer[i] = byteBuffer.getLong();
+    }
+    return numValues;
+  }
+
+  @Override
+  public int getFloatMV(int docId, float[] valueBuffer, ChunkReaderContext context) {
+    ByteBuffer byteBuffer = slice(docId, context);
+    int numValues = byteBuffer.getInt();
+    for (int i = 0; i < numValues; i++) {
+      valueBuffer[i] = byteBuffer.getFloat();
+    }
+    return numValues;
+  }
+
+  @Override
+  public int getDoubleMV(int docId, double[] valueBuffer, ChunkReaderContext context) {
+    ByteBuffer byteBuffer = slice(docId, context);
+    int numValues = byteBuffer.getInt();
+    for (int i = 0; i < numValues; i++) {
+      valueBuffer[i] = byteBuffer.getDouble();
+    }
+    return numValues;
+  }
+
+  private ByteBuffer slice(int docId, ChunkReaderContext context) {
+    if (_isCompressed) {
+      return sliceBytesCompressed(docId, context);
+    } else {
+      return sliceBytesUncompressed(docId);
+    }
+  }
+
+  /**
+   * Helper method to read BYTES value from the compressed index.
+   */
+  private ByteBuffer sliceBytesCompressed(int docId, ChunkReaderContext context) {
+    int chunkRowId = docId % _numDocsPerChunk;
+    ByteBuffer chunkBuffer = getChunkBuffer(docId, context);
+
+    // These offsets are offset in the chunk buffer
+    int valueStartOffset = chunkBuffer.getInt(chunkRowId * ROW_OFFSET_SIZE);
+    int valueEndOffset = getValueEndOffset(chunkRowId, chunkBuffer);
+    // cast only for JDK8 compilation profile
+    return (ByteBuffer) chunkBuffer.duplicate().position(valueStartOffset).limit(valueEndOffset);
+  }
+
+  /**
+   * Helper method to read BYTES value from the uncompressed index.
+   */
+  private ByteBuffer sliceBytesUncompressed(int docId) {
+    int chunkId = docId / _numDocsPerChunk;
+    int chunkRowId = docId % _numDocsPerChunk;
+
+    // These offsets are offset in the data buffer
+    long chunkStartOffset = getChunkPosition(chunkId);
+    long valueStartOffset =
+        chunkStartOffset + _dataBuffer.getInt(chunkStartOffset + (long) chunkRowId * ROW_OFFSET_SIZE);
+    long valueEndOffset = getValueEndOffset(chunkId, chunkRowId, chunkStartOffset);
+    return _dataBuffer.toDirectByteBuffer(valueStartOffset, (int) (valueEndOffset - valueStartOffset));
+  }
+
+  /**
+   * Helper method to compute the end offset of the value in the chunk buffer.
+   */
+  private int getValueEndOffset(int rowId, ByteBuffer chunkBuffer) {
+    if (rowId == _numDocsPerChunk - 1) {
+      // Last row in the chunk
+      return chunkBuffer.limit();
+    } else {
+      int valueEndOffset = chunkBuffer.getInt((rowId + 1) * ROW_OFFSET_SIZE);
+      if (valueEndOffset == 0) {
+        // Last row in the last chunk (chunk is incomplete, which stores 0 as the offset for the absent rows)
+        return chunkBuffer.limit();
+      } else {
+        return valueEndOffset;
+      }
+    }
+  }
+
+  /**
+   * Helper method to compute the end offset of the value in the data buffer.
+   */
+  private long getValueEndOffset(int chunkId, int chunkRowId, long chunkStartOffset) {
+    if (chunkId == _numChunks - 1) {
+      // Last chunk
+      if (chunkRowId == _numDocsPerChunk - 1) {
+        // Last row in the last chunk
+        return _dataBuffer.size();
+      } else {
+        int valueEndOffsetInChunk = _dataBuffer
+            .getInt(chunkStartOffset + (long) (chunkRowId + 1) * ROW_OFFSET_SIZE);
+        if (valueEndOffsetInChunk == 0) {
+          // Last row in the last chunk (chunk is incomplete, which stores 0 as the offset for the absent rows)
+          return _dataBuffer.size();
+        } else {
+          return chunkStartOffset + valueEndOffsetInChunk;
+        }
+      }
+    } else {
+      if (chunkRowId == _numDocsPerChunk - 1) {
+        // Last row in the chunk
+        return getChunkPosition(chunkId + 1);
+      } else {
+        return chunkStartOffset + _dataBuffer
+            .getInt(chunkStartOffset + (long) (chunkRowId + 1) * ROW_OFFSET_SIZE);
+      }
+    }
+  }
+}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
new file mode 100644
index 0000000..99b4c3f
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.creator;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.function.IntFunction;
+import java.util.function.ToIntFunction;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator;
+import org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader.ChunkReaderContext;
+import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader;
+import org.apache.pinot.segment.spi.V1Constants.Indexes;
+import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class MultiValueFixedByteRawIndexCreatorTest {
+
+  private static final String OUTPUT_DIR =
+      System.getProperty("java.io.tmpdir") + File.separator + "mvFixedRawTest";
+
+  private static final Random RANDOM = new Random();
+
+  @BeforeClass
+  public void setup() throws Exception {
+    FileUtils.forceMkdir(new File(OUTPUT_DIR));
+  }
+
+  /**
+   * Clean up after test
+   */
+  @AfterClass
+  public void cleanup() {
+    FileUtils.deleteQuietly(new File(OUTPUT_DIR));
+  }
+
+  @Test
+  public void testMVInt() throws IOException {
+    testMV(DataType.INT, ints(), x -> x.length, int[]::new, MultiValueFixedByteRawIndexCreator::putIntMV,
+        (reader, context, docId, buffer) -> {
+          int length = reader.getIntMV(docId, buffer, context);
+          return Arrays.copyOf(buffer, length);
+        });
+  }
+
+  @Test
+  public void testMVLong() throws IOException {
+    testMV(DataType.LONG, longs(), x -> x.length, long[]::new, MultiValueFixedByteRawIndexCreator::putLongMV,
+        (reader, context, docId, buffer) -> {
+            int length = reader.getLongMV(docId, buffer, context);
+            return Arrays.copyOf(buffer, length);
+        });
+  }
+
+  @Test
+  public void testMVFloat() throws IOException {
+    testMV(DataType.FLOAT, floats(), x -> x.length, float[]::new, MultiValueFixedByteRawIndexCreator::putFloatMV,
+        (reader, context, docId, buffer) -> {
+          int length = reader.getFloatMV(docId, buffer, context);
+          return Arrays.copyOf(buffer, length);
+        });
+  }
+
+  @Test
+  public void testMVDouble() throws IOException {
+    testMV(DataType.DOUBLE, doubles(), x -> x.length, double[]::new, MultiValueFixedByteRawIndexCreator::putDoubleMV,
+        (reader, context, docId, buffer) -> {
+          int length = reader.getDoubleMV(docId, buffer, context);
+          return Arrays.copyOf(buffer, length);
+        });
+  }
+
+
+  public <T> void testMV(DataType dataType, List<T> inputs, ToIntFunction<T> sizeof, IntFunction<T> constructor,
+      Injector<T> injector, Extractor<T> extractor)
+      throws IOException {
+    String column = "testCol_" + dataType;
+    int numDocs = inputs.size();
+    int maxElements = inputs.stream().mapToInt(sizeof).max().orElseThrow(RuntimeException::new);
+    File file = new File(OUTPUT_DIR, column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
+    file.delete();
+    MultiValueFixedByteRawIndexCreator creator = new MultiValueFixedByteRawIndexCreator(new File(OUTPUT_DIR),
+        ChunkCompressionType.SNAPPY, column, numDocs, dataType, maxElements);
+    inputs.forEach(input -> injector.inject(creator, input));
+    creator.close();
+
+    //read
+    final PinotDataBuffer buffer = PinotDataBuffer
+        .mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, "");
+    FixedByteChunkMVForwardIndexReader reader = new FixedByteChunkMVForwardIndexReader(buffer, DataType.BYTES);
+    final ChunkReaderContext context = reader.createContext();
+    T valueBuffer = constructor.apply(maxElements);
+    for (int i = 0; i < numDocs; i++) {
+      Assert.assertEquals(inputs.get(i), extractor.extract(reader, context, i, valueBuffer));
+    }
+  }
+
+  interface Extractor<T> {
+    T extract(FixedByteChunkMVForwardIndexReader reader, ChunkReaderContext context, int offset, T buffer);
+  }
+
+  interface Injector<T> {
+    void inject(MultiValueFixedByteRawIndexCreator creator, T input);
+  }
+
+  private static List<int[]> ints() {
+    return IntStream.range(0, 1000)
+        .mapToObj(i -> new int[RANDOM.nextInt(50)])
+        .peek(array -> {
+          for (int i = 0; i < array.length; i++) {
+            array[i] = RANDOM.nextInt();
+          }
+        })
+        .collect(Collectors.toList());
+  }
+
+  private static List<long[]> longs() {
+    return IntStream.range(0, 1000)
+        .mapToObj(i -> new long[RANDOM.nextInt(50)])
+        .peek(array -> {
+          for (int i = 0; i < array.length; i++) {
+            array[i] = RANDOM.nextLong();
+          }
+        })
+        .collect(Collectors.toList());
+  }
+
+  private static List<float[]> floats() {
+    return IntStream.range(0, 1000)
+        .mapToObj(i -> new float[RANDOM.nextInt(50)])
+        .peek(array -> {
+          for (int i = 0; i < array.length; i++) {
+            array[i] = RANDOM.nextFloat();
+          }
+        })
+        .collect(Collectors.toList());
+  }
+
+  private static List<double[]> doubles() {
+    return IntStream.range(0, 1000)
+        .mapToObj(i -> new double[RANDOM.nextInt(50)])
+        .peek(array -> {
+          for (int i = 0; i < array.length; i++) {
+            array[i] = RANDOM.nextDouble();
+          }
+        })
+        .collect(Collectors.toList());
+  }
+
+}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
index a1f6e2c..c496e91 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
@@ -64,8 +64,8 @@ public class MultiValueVarByteRawIndexCreatorTest {
     int maxTotalLength = 500;
     File file = new File(OUTPUT_DIR, column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
     file.delete();
-    MultiValueVarByteRawIndexCreator creator = new MultiValueVarByteRawIndexCreator(
-        new File(OUTPUT_DIR), ChunkCompressionType.SNAPPY, column, numDocs, DataType.STRING, maxTotalLength);
+    MultiValueVarByteRawIndexCreator creator = new MultiValueVarByteRawIndexCreator(new File(OUTPUT_DIR),
+        ChunkCompressionType.SNAPPY, column, numDocs, DataType.STRING, maxTotalLength, maxElements);
     List<String[]> inputs = new ArrayList<>();
     Random random = new Random();
     for (int i = 0; i < numDocs; i++) {
@@ -106,7 +106,7 @@ public class MultiValueVarByteRawIndexCreatorTest {
     file.delete();
     MultiValueVarByteRawIndexCreator creator = new MultiValueVarByteRawIndexCreator(
         new File(OUTPUT_DIR), ChunkCompressionType.SNAPPY, column, numDocs, DataType.BYTES,
-        maxTotalLength);
+        maxTotalLength, maxElements);
     List<byte[][]> inputs = new ArrayList<>();
     Random random = new Random();
     for (int i = 0; i < numDocs; i++) {
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
index 6393aaf..941522c 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
@@ -257,8 +257,4 @@ public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends
     throw new UnsupportedOperationException();
   }
 
-  default int getFloatMV(int docId, float[] valueBuffer, T context, int[] parentIndices) {
-    throw new UnsupportedOperationException();
-  }
-
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org