You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ki...@apache.org on 2021/10/18 03:46:44 UTC

[pinot] branch mv-fwd-index updated: Wiring in the segment creation driver Impl

This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch mv-fwd-index
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/mv-fwd-index by this push:
     new e8bca30  Wiring in the segment creation driver Impl
e8bca30 is described below

commit e8bca30473c28e9e9efe371bd06442955da7d433
Author: kishoreg <g....@gmail.com>
AuthorDate: Sun Oct 17 20:46:09 2021 -0700

    Wiring in the segment creation driver Impl
---
 .../apache/pinot/common/utils/PinotDataType.java   |  47 ++-
 .../pinot/core/minion/RawIndexConverter.java       |   2 +-
 .../tests/BaseClusterIntegrationTest.java          |   3 +-
 .../impl/VarByteChunkSVForwardIndexWriter.java     |  43 ++-
 .../creator/impl/SegmentColumnarIndexCreator.java  | 422 ++++++++++++++++-----
 .../fwd/MultiValueFixedByteRawIndexCreator.java    |  22 +-
 .../impl/fwd/MultiValueVarByteRawIndexCreator.java |  23 +-
 .../stats/BytesColumnPredIndexStatsCollector.java  |  56 ++-
 .../local/segment/store/FilePerIndexDirectory.java |   7 +-
 .../MultiValueVarByteRawIndexCreatorTest.java      |  45 ++-
 .../segment/index/creator/RawIndexCreatorTest.java | 147 +++++--
 .../converter/DictionaryToRawIndexConverter.java   |   2 +-
 12 files changed, 627 insertions(+), 192 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
index 49406ba..a94fee9 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -29,7 +29,6 @@ import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.TimestampUtils;
 
-
 /**
  *  The <code>PinotDataType</code> enum represents the data type of a value in a row from recordReader and provides
  *  utility methods to convert value across types if applicable.
@@ -583,7 +582,8 @@ public enum PinotDataType {
       try {
         return Base64.getDecoder().decode(value.toString());
       } catch (Exception e) {
-        throw new RuntimeException("Unable to convert JSON base64 encoded string value to BYTES. Input value: " + value,
+        throw new RuntimeException(
+            "Unable to convert JSON base64 encoded string value to BYTES. Input value: " + value,
             e);
       }
     }
@@ -769,6 +769,12 @@ public enum PinotDataType {
       return sourceType.toStringArray(value);
     }
   },
+  BYTES_ARRAY {
+    @Override
+    public byte[][] convert(Object value, PinotDataType sourceType) {
+      return sourceType.toBytesArray(value);
+    }
+  },
 
   OBJECT_ARRAY;
 
@@ -817,7 +823,8 @@ public enum PinotDataType {
         return JsonUtils.objectToString(value);
       } catch (Exception e) {
         throw new RuntimeException(
-            "Unable to convert " + value.getClass().getCanonicalName() + " to JSON. Input value: " + value, e);
+            "Unable to convert " + value.getClass().getCanonicalName() + " to JSON. Input value: "
+                + value, e);
       }
     }
   }
@@ -1020,6 +1027,24 @@ public enum PinotDataType {
     }
   }
 
+  public byte[][] toBytesArray(Object value) {
+    if (value instanceof byte[][]) {
+      return (byte[][]) value;
+    }
+    if (isSingleValue()) {
+      return new byte[][]{toBytes(value)};
+    } else {
+      Object[] valueArray = toObjectArray(value);
+      int length = valueArray.length;
+      byte[][] bytesArray = new byte[length][];
+      PinotDataType singleValueType = getSingleValueType();
+      for (int i = 0; i < length; i++) {
+        bytesArray[i] = singleValueType.toBytes(valueArray[i]);
+      }
+      return bytesArray;
+    }
+  }
+
   private static Object[] toObjectArray(Object array) {
     Class<?> componentType = array.getClass().getComponentType();
     if (componentType.isPrimitive()) {
@@ -1042,7 +1067,8 @@ public enum PinotDataType {
   }
 
   public Object convert(Object value, PinotDataType sourceType) {
-    throw new UnsupportedOperationException("Cannot convert value from " + sourceType + " to " + this);
+    throw new UnsupportedOperationException(
+        "Cannot convert value from " + sourceType + " to " + this);
   }
 
   /**
@@ -1082,6 +1108,8 @@ public enum PinotDataType {
         return DOUBLE;
       case STRING_ARRAY:
         return STRING;
+      case BYTES_ARRAY:
+        return BYTES;
       case OBJECT_ARRAY:
         return OBJECT;
       default:
@@ -1151,6 +1179,9 @@ public enum PinotDataType {
     if (cls == Short.class) {
       return SHORT_ARRAY;
     }
+    if (cls == byte[].class) {
+      return BYTES_ARRAY;
+    }
     return OBJECT_ARRAY;
   }
 
@@ -1210,7 +1241,8 @@ public enum PinotDataType {
         if (fieldSpec.isSingleValueField()) {
           return BYTES;
         } else {
-          throw new IllegalStateException("There is no multi-value type for BYTES");
+          return BYTES_ARRAY;
+//          throw new IllegalStateException("There is no multi-value type for BYTES");
         }
       default:
         throw new UnsupportedOperationException(
@@ -1253,7 +1285,8 @@ public enum PinotDataType {
       case STRING_ARRAY:
         return STRING_ARRAY;
       default:
-        throw new IllegalStateException("Cannot convert ColumnDataType: " + columnDataType + " to PinotDataType");
+        throw new IllegalStateException(
+            "Cannot convert ColumnDataType: " + columnDataType + " to PinotDataType");
     }
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
index f1c7fbb..a3b2e24 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
@@ -207,7 +207,7 @@ public class RawIndexConverter {
     int numDocs = _originalSegmentMetadata.getTotalDocs();
     int lengthOfLongestEntry = _originalSegmentMetadata.getColumnMetadataFor(columnName).getColumnMaxLength();
     try (ForwardIndexCreator rawIndexCreator = SegmentColumnarIndexCreator
-        .getRawIndexCreatorForColumn(_convertedIndexDir, ChunkCompressionType.SNAPPY, columnName, storedType, numDocs,
+        .getRawIndexCreatorForSVColumn(_convertedIndexDir, ChunkCompressionType.SNAPPY, columnName, storedType, numDocs,
             lengthOfLongestEntry, false, BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
         ForwardIndexReaderContext readerContext = reader.createContext()) {
       switch (storedType) {
diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index ab5e028..ae36cc6 100644
--- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -81,11 +81,12 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
   protected static final int DEFAULT_HLC_NUM_KAFKA_PARTITIONS = 10;
   protected static final int DEFAULT_MAX_NUM_KAFKA_MESSAGES_PER_BATCH = 10000;
   protected static final List<String> DEFAULT_NO_DICTIONARY_COLUMNS =
-      Arrays.asList("ActualElapsedTime", "ArrDelay", "DepDelay", "CRSDepTime");
+      Arrays.asList("ActualElapsedTime", "ArrDelay", "DepDelay", "CRSDepTime", "RandomAirports");
   protected static final String DEFAULT_SORTED_COLUMN = "Carrier";
   protected static final List<String> DEFAULT_INVERTED_INDEX_COLUMNS = Arrays.asList("FlightNum", "Origin", "Quarter");
   private static final List<String> DEFAULT_BLOOM_FILTER_COLUMNS = Arrays.asList("FlightNum", "Origin");
   private static final List<String> DEFAULT_RANGE_INDEX_COLUMNS = Collections.singletonList("Origin");
+  private static final List<String> DEFAULT_TEXT_INDEX_COLUMNS = Collections.singletonList("RandomAirports");
   protected static final int DEFAULT_NUM_REPLICAS = 1;
   protected static final boolean DEFAULT_NULL_HANDLING_ENABLED = false;
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
index 4444512..44ea807 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkSVForwardIndexWriter.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -25,34 +25,35 @@ import javax.annotation.concurrent.NotThreadSafe;
 import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 
-
 /**
  * Class to write out variable length bytes into a single column.
  *
  * The layout of the file is as follows:
  * <p> Header Section: </p>
  * <ul>
- *   <li> Integer: File format version. </li>
- *   <li> Integer: Total number of chunks. </li>
- *   <li> Integer: Number of docs per chunk. </li>
- *   <li> Integer: Length of longest entry (in bytes). </li>
- *   <li> Integer: Total number of docs (version 2 onwards). </li>
- *   <li> Integer: Compression type enum value (version 2 onwards). </li>
- *   <li> Integer: Start offset of data header (version 2 onwards). </li>
- *   <li> Integer array: Integer offsets for all chunks in the data (upto version 2),
- *   Long array: Long offsets for all chunks in the data (version 3 onwards) </li>
+ * <li> Integer: File format version. </li>
+ * <li> Integer: Total number of chunks. </li>
+ * <li> Integer: Number of docs per chunk. </li>
+ * <li> Integer: Length of longest entry (in bytes). </li>
+ * <li> Integer: Total number of docs (version 2 onwards). </li>
+ * <li> Integer: Compression type enum value (version 2 onwards). </li>
+ * <li> Integer: Start offset of data header (version 2 onwards). </li>
+ * <li> Integer array: Integer offsets for all chunks in the data (upto version 2),
+ * Long array: Long offsets for all chunks in the data (version 3 onwards) </li>
  * </ul>
  *
  * <p> Individual Chunks: </p>
  * <ul>
- *   <li> Integer offsets to start position of rows: For partial chunks, offset values are 0 for missing rows. </li>
- *   <li> Data bytes. </li>
+ * <li> Integer offsets to start position of rows: For partial chunks, offset values are 0 for
+ * missing rows. </li>
+ * <li> Data bytes. </li>
  * </ul>
  *
  * Only sequential writes are supported.
  */
 @NotThreadSafe
 public class VarByteChunkSVForwardIndexWriter extends BaseChunkSVForwardIndexWriter {
+
   public static final int CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE = Integer.BYTES;
 
   private final int _chunkHeaderSize;
@@ -68,9 +69,11 @@ public class VarByteChunkSVForwardIndexWriter extends BaseChunkSVForwardIndexWri
    * @param numDocsPerChunk Number of documents per chunk.
    * @param lengthOfLongestEntry Length of longest entry (in bytes)
    * @param writerVersion writer format version
-   * @throws FileNotFoundException Throws {@link FileNotFoundException} if the specified file is not found.
+   * @throws FileNotFoundException Throws {@link FileNotFoundException} if the specified file is
+   *     not found.
    */
-  public VarByteChunkSVForwardIndexWriter(File file, ChunkCompressionType compressionType, int totalDocs,
+  public VarByteChunkSVForwardIndexWriter(File file, ChunkCompressionType compressionType,
+      int totalDocs,
       int numDocsPerChunk, int lengthOfLongestEntry, int writerVersion)
       throws FileNotFoundException {
     super(file, compressionType, totalDocs, numDocsPerChunk,
@@ -119,12 +122,12 @@ public class VarByteChunkSVForwardIndexWriter extends BaseChunkSVForwardIndexWri
   /**
    * Helper method to compress and write the current chunk.
    * <ul>
-   *   <li> Chunk header is of fixed size, so fills out any remaining offsets for partially filled chunks. </li>
-   *   <li> Compresses and writes the chunk to the data file. </li>
-   *   <li> Updates the header with the current chunks offset. </li>
-   *   <li> Clears up the buffers, so that they can be reused. </li>
+   * <li> Chunk header is of fixed size, so fills out any remaining offsets for partially filled
+   * chunks. </li>
+   * <li> Compresses and writes the chunk to the data file. </li>
+   * <li> Updates the header with the current chunks offset. </li>
+   * <li> Clears up the buffers, so that they can be reused. </li>
    * </ul>
-   *
    */
   protected void writeChunk() {
     // For partially filled chunks, we still need to clear the offsets for remaining rows, as we reuse this buffer.
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
index 2829e8e..9d6b32b 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -23,6 +23,8 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import java.io.File;
 import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -34,7 +36,9 @@ import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.pinot.common.utils.FileUtils;
 import org.apache.pinot.segment.local.io.util.PinotDataBitSet;
 import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
+import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueFixedByteRawIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSortedForwardIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator;
@@ -82,12 +86,12 @@ import org.slf4j.LoggerFactory;
 import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.*;
 import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Segment.*;
 
-
 /**
  * Segment creator which writes data in a columnar form.
  */
 // TODO: check resource leaks
 public class SegmentColumnarIndexCreator implements SegmentCreator {
+
   // TODO Refactor class name to match interface name
   private static final Logger LOGGER = LoggerFactory.getLogger(SegmentColumnarIndexCreator.class);
   // Allow at most 512 characters for the metadata property
@@ -112,7 +116,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
   private Map<String, Map<String, String>> _columnProperties;
 
   @Override
-  public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreationInfo segmentIndexCreationInfo,
+  public void init(SegmentGeneratorConfig segmentCreationSpec,
+      SegmentIndexCreationInfo segmentIndexCreationInfo,
       Map<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema schema, File outDir)
       throws Exception {
     _docIdCounter = 0;
@@ -121,7 +126,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
     _columnProperties = segmentCreationSpec.getColumnProperties();
 
     // Check that the output directory does not exist
-    Preconditions.checkState(!outDir.exists(), "Segment output directory: %s already exists", outDir);
+    Preconditions
+        .checkState(!outDir.exists(), "Segment output directory: %s already exists", outDir);
 
     Preconditions.checkState(outDir.mkdirs(), "Failed to create output directory: %s", outDir);
     _indexDir = outDir;
@@ -164,7 +170,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
     Map<String, H3IndexConfig> h3IndexConfigs = _config.getH3IndexConfigs();
     for (String columnName : h3IndexConfigs.keySet()) {
       Preconditions
-          .checkState(schema.hasColumn(columnName), "Cannot create H3 index for column: %s because it is not in schema",
+          .checkState(schema.hasColumn(columnName),
+              "Cannot create H3 index for column: %s because it is not in schema",
               columnName);
     }
 
@@ -178,15 +185,18 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
       String columnName = fieldSpec.getName();
       DataType storedType = fieldSpec.getDataType().getStoredType();
       ColumnIndexCreationInfo indexCreationInfo = indexCreationInfoMap.get(columnName);
-      Preconditions.checkNotNull(indexCreationInfo, "Missing index creation info for column: %s", columnName);
-      boolean dictEnabledColumn = createDictionaryForColumn(indexCreationInfo, segmentCreationSpec, fieldSpec);
+      Preconditions.checkNotNull(indexCreationInfo, "Missing index creation info for column: %s",
+          columnName);
+      boolean dictEnabledColumn = createDictionaryForColumn(indexCreationInfo, segmentCreationSpec,
+          fieldSpec);
 
       if (dictEnabledColumn) {
         // Create dictionary-encoded index
 
         // Initialize dictionary creator
         SegmentDictionaryCreator dictionaryCreator =
-            new SegmentDictionaryCreator(indexCreationInfo.getSortedUniqueElementsArray(), fieldSpec, _indexDir,
+            new SegmentDictionaryCreator(indexCreationInfo.getSortedUniqueElementsArray(),
+                fieldSpec, _indexDir,
                 indexCreationInfo.isUseVarLengthDictionary());
         _dictionaryCreatorMap.put(columnName, dictionaryCreator);
 
@@ -194,8 +204,10 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
         try {
           dictionaryCreator.build();
         } catch (Exception e) {
-          LOGGER.error("Error building dictionary for field: {}, cardinality: {}, number of bytes per entry: {}",
-              fieldSpec.getName(), indexCreationInfo.getDistinctValueCount(), dictionaryCreator.getNumBytesPerEntry());
+          LOGGER.error(
+              "Error building dictionary for field: {}, cardinality: {}, number of bytes per entry: {}",
+              fieldSpec.getName(), indexCreationInfo.getDistinctValueCount(),
+              dictionaryCreator.getNumBytesPerEntry());
           throw e;
         }
 
@@ -204,14 +216,17 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
         if (fieldSpec.isSingleValueField()) {
           if (indexCreationInfo.isSorted()) {
             _forwardIndexCreatorMap
-                .put(columnName, new SingleValueSortedForwardIndexCreator(_indexDir, columnName, cardinality));
+                .put(columnName,
+                    new SingleValueSortedForwardIndexCreator(_indexDir, columnName, cardinality));
           } else {
             _forwardIndexCreatorMap.put(columnName,
-                new SingleValueUnsortedForwardIndexCreator(_indexDir, columnName, cardinality, _totalDocs));
+                new SingleValueUnsortedForwardIndexCreator(_indexDir, columnName, cardinality,
+                    _totalDocs));
           }
         } else {
           _forwardIndexCreatorMap.put(columnName,
-              new MultiValueUnsortedForwardIndexCreator(_indexDir, columnName, cardinality, _totalDocs,
+              new MultiValueUnsortedForwardIndexCreator(_indexDir, columnName, cardinality,
+                  _totalDocs,
                   indexCreationInfo.getTotalNumberOfEntries()));
         }
 
@@ -219,7 +234,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
         if (invertedIndexColumns.contains(columnName) && !indexCreationInfo.isSorted()) {
           if (segmentCreationSpec.isOnHeap()) {
             _invertedIndexCreatorMap
-                .put(columnName, new OnHeapBitmapInvertedIndexCreator(_indexDir, columnName, cardinality));
+                .put(columnName,
+                    new OnHeapBitmapInvertedIndexCreator(_indexDir, columnName, cardinality));
           } else {
             _invertedIndexCreatorMap.put(columnName,
                 new OffHeapBitmapInvertedIndexCreator(_indexDir, fieldSpec, cardinality, _totalDocs,
@@ -230,20 +246,33 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
         // Create raw index
 
         // TODO: add support to multi-value column and inverted index
-        Preconditions.checkState(fieldSpec.isSingleValueField(), "Cannot create raw index for multi-value column: %s",
-            columnName);
+//        Preconditions.checkState(fieldSpec.isSingleValueField(), "Cannot create raw index for multi-value column: %s",
+//            columnName);
         Preconditions.checkState(!invertedIndexColumns.contains(columnName),
             "Cannot create inverted index for raw index column: %s", columnName);
 
-        ChunkCompressionType compressionType = getColumnCompressionType(segmentCreationSpec, fieldSpec);
+        ChunkCompressionType compressionType = getColumnCompressionType(segmentCreationSpec,
+            fieldSpec);
 
         // Initialize forward index creator
         boolean deriveNumDocsPerChunk =
             shouldDeriveNumDocsPerChunk(columnName, segmentCreationSpec.getColumnProperties());
-        int writerVersion = rawIndexWriterVersion(columnName, segmentCreationSpec.getColumnProperties());
-        _forwardIndexCreatorMap.put(columnName,
-            getRawIndexCreatorForColumn(_indexDir, compressionType, columnName, storedType, _totalDocs,
-                indexCreationInfo.getLengthOfLongestEntry(), deriveNumDocsPerChunk, writerVersion));
+        int writerVersion = rawIndexWriterVersion(columnName,
+            segmentCreationSpec.getColumnProperties());
+        if (fieldSpec.isSingleValueField()) {
+          _forwardIndexCreatorMap.put(columnName,
+              getRawIndexCreatorForSVColumn(_indexDir, compressionType, columnName, storedType,
+                  _totalDocs,
+                  indexCreationInfo.getLengthOfLongestEntry(), deriveNumDocsPerChunk,
+                  writerVersion));
+        } else {
+          _forwardIndexCreatorMap.put(columnName,
+              getRawIndexCreatorForMVColumn(_indexDir, compressionType, columnName, storedType,
+                  _totalDocs,
+                  indexCreationInfo.getLengthOfLongestEntry(),
+                  indexCreationInfo.getMaxNumberOfMultiValueElements(), deriveNumDocsPerChunk,
+                  writerVersion));
+        }
       }
 
       if (textIndexColumns.contains(columnName)) {
@@ -251,18 +280,22 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
         Preconditions.checkState(fieldSpec.isSingleValueField(),
             "Text index is currently only supported on single-value columns");
         Preconditions
-            .checkState(storedType == DataType.STRING, "Text index is currently only supported on STRING type columns");
+            .checkState(storedType == DataType.STRING,
+                "Text index is currently only supported on STRING type columns");
         _textIndexCreatorMap
-            .put(columnName, new LuceneTextIndexCreator(columnName, _indexDir, true /* commitOnClose */));
+            .put(columnName,
+                new LuceneTextIndexCreator(columnName, _indexDir, true /* commitOnClose */));
       }
 
       if (fstIndexColumns.contains(columnName)) {
         Preconditions.checkState(fieldSpec.isSingleValueField(),
             "FST index is currently only supported on single-value columns");
         Preconditions
-            .checkState(storedType == DataType.STRING, "FST index is currently only supported on STRING type columns");
+            .checkState(storedType == DataType.STRING,
+                "FST index is currently only supported on STRING type columns");
         Preconditions
-            .checkState(dictEnabledColumn, "FST index is currently only supported on dictionary-encoded columns");
+            .checkState(dictEnabledColumn,
+                "FST index is currently only supported on dictionary-encoded columns");
         _fstIndexCreatorMap.put(columnName, new LuceneFSTIndexCreator(_indexDir, columnName,
             (String[]) indexCreationInfo.getSortedUniqueElementsArray()));
       }
@@ -271,7 +304,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
         Preconditions.checkState(fieldSpec.isSingleValueField(),
             "Json index is currently only supported on single-value columns");
         Preconditions
-            .checkState(storedType == DataType.STRING, "Json index is currently only supported on STRING columns");
+            .checkState(storedType == DataType.STRING,
+                "Json index is currently only supported on STRING columns");
         JsonIndexCreator jsonIndexCreator =
             segmentCreationSpec.isOnHeap() ? new OnHeapJsonIndexCreator(_indexDir, columnName)
                 : new OffHeapJsonIndexCreator(_indexDir, columnName);
@@ -281,11 +315,14 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
       H3IndexConfig h3IndexConfig = h3IndexConfigs.get(columnName);
       if (h3IndexConfig != null) {
         Preconditions
-            .checkState(fieldSpec.isSingleValueField(), "H3 index is currently only supported on single-value columns");
-        Preconditions.checkState(storedType == DataType.BYTES, "H3 index is currently only supported on BYTES columns");
+            .checkState(fieldSpec.isSingleValueField(),
+                "H3 index is currently only supported on single-value columns");
+        Preconditions.checkState(storedType == DataType.BYTES,
+            "H3 index is currently only supported on BYTES columns");
         H3IndexResolution resolution = h3IndexConfig.getResolution();
         GeoSpatialIndexCreator h3IndexCreator =
-            segmentCreationSpec.isOnHeap() ? new OnHeapH3IndexCreator(_indexDir, columnName, resolution)
+            segmentCreationSpec.isOnHeap() ? new OnHeapH3IndexCreator(_indexDir, columnName,
+                resolution)
                 : new OffHeapH3IndexCreator(_indexDir, columnName, resolution);
         _h3IndexCreatorMap.put(columnName, h3IndexCreator);
       }
@@ -293,7 +330,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
       _nullHandlingEnabled = _config.isNullHandlingEnabled();
       if (_nullHandlingEnabled) {
         // Initialize Null value vector map
-        _nullValueVectorCreatorMap.put(columnName, new NullValueVectorCreator(_indexDir, columnName));
+        _nullValueVectorCreatorMap
+            .put(columnName, new NullValueVectorCreator(_indexDir, columnName));
       }
     }
   }
@@ -308,7 +346,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
     return false;
   }
 
-  public static int rawIndexWriterVersion(String columnName, Map<String, Map<String, String>> columnProperties) {
+  public static int rawIndexWriterVersion(String columnName,
+      Map<String, Map<String, String>> columnProperties) {
     if (columnProperties != null && columnProperties.get(columnName) != null) {
       Map<String, String> properties = columnProperties.get(columnName);
       String version = properties.get(FieldConfig.RAW_INDEX_WRITER_VERSION);
@@ -321,20 +360,26 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
   }
 
   /**
-   * Helper method that returns compression type to use based on segment creation spec and field type.
+   * Helper method that returns compression type to use based on segment creation spec and field
+   * type.
    * <ul>
-   *   <li> Returns compression type from segment creation spec, if specified there.</li>
-   *   <li> Else, returns PASS_THROUGH for metrics, and SNAPPY for dimensions. This is because metrics are likely
-   *        to be spread in different chunks after applying predicates. Same could be true for dimensions, but in that
-   *        case, clients are expected to explicitly specify the appropriate compression type in the spec. </li>
+   * <li> Returns compression type from segment creation spec, if specified there.</li>
+   * <li> Else, returns PASS_THROUGH for metrics, and SNAPPY for dimensions. This is because metrics
+   * are likely
+   * to be spread in different chunks after applying predicates. Same could be true for dimensions,
+   * but in that
+   * case, clients are expected to explicitly specify the appropriate compression type in the spec.
+   * </li>
    * </ul>
+   *
    * @param segmentCreationSpec Segment creation spec
    * @param fieldSpec Field spec for the column
    * @return Compression type to use
    */
   private ChunkCompressionType getColumnCompressionType(SegmentGeneratorConfig segmentCreationSpec,
       FieldSpec fieldSpec) {
-    ChunkCompressionType compressionType = segmentCreationSpec.getRawIndexCompressionType().get(fieldSpec.getName());
+    ChunkCompressionType compressionType = segmentCreationSpec.getRawIndexCompressionType()
+        .get(fieldSpec.getName());
     if (compressionType == null) {
       if (fieldSpec.getFieldType() == FieldType.METRIC) {
         return ChunkCompressionType.PASS_THROUGH;
@@ -350,8 +395,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
    * Returns true if dictionary should be created for a column, false otherwise.
    * Currently there are two sources for this config:
    * <ul>
-   *   <li> ColumnIndexCreationInfo (this is currently hard-coded to always return dictionary). </li>
-   *   <li> SegmentGeneratorConfig</li>
+   * <li> ColumnIndexCreationInfo (this is currently hard-coded to always return dictionary). </li>
+   * <li> SegmentGeneratorConfig</li>
    * </ul>
    *
    * This method gives preference to the SegmentGeneratorConfig first.
@@ -361,15 +406,16 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
    * @param spec Field spec for the column
    * @return True if dictionary should be created for the column, false otherwise
    */
-  private boolean createDictionaryForColumn(ColumnIndexCreationInfo info, SegmentGeneratorConfig config,
+  private boolean createDictionaryForColumn(ColumnIndexCreationInfo info,
+      SegmentGeneratorConfig config,
       FieldSpec spec) {
     String column = spec.getName();
     if (config.getRawIndexCreationColumns().contains(column) || config.getRawIndexCompressionType()
         .containsKey(column)) {
-      if (!spec.isSingleValueField()) {
-        throw new RuntimeException(
-            "Creation of indices without dictionaries is supported for single valued columns only.");
-      }
+//      if (!spec.isSingleValueField()) {
+//        throw new RuntimeException(
+//            "Creation of indices without dictionaries is supported for single valued columns only.");
+//      }
       return false;
     }
     return info.isCreateDictionary();
@@ -387,16 +433,19 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
         throw new RuntimeException("Null value for column:" + columnName);
       }
 
-      boolean isSingleValue = _schema.getFieldSpecFor(columnName).isSingleValueField();
+      FieldSpec fieldSpec = _schema.getFieldSpecFor(columnName);
+
+      //get dictionaryCreator, will be null if column is not dictionaryEncoded
       SegmentDictionaryCreator dictionaryCreator = _dictionaryCreatorMap.get(columnName);
 
-      if (isSingleValue) {
-        // SV column
-        // text-index enabled SV column
-        TextIndexCreator textIndexCreator = _textIndexCreatorMap.get(columnName);
-        if (textIndexCreator != null) {
-          textIndexCreator.add((String) columnValueToIndex);
-        }
+      // text-index
+      TextIndexCreator textIndexCreator = _textIndexCreatorMap.get(columnName);
+      if (textIndexCreator != null) {
+        textIndexCreator.add((String) columnValueToIndex);
+      }
+
+      if (fieldSpec.isSingleValueField()) {
+        // Single Value column
         JsonIndexCreator jsonIndexCreator = _jsonIndexCreatorMap.get(columnName);
         if (jsonIndexCreator != null) {
           jsonIndexCreator.add((String) columnValueToIndex);
@@ -411,7 +460,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
           int dictId = dictionaryCreator.indexOfSV(columnValueToIndex);
           // store the docID -> dictID mapping in forward index
           forwardIndexCreator.putDictId(dictId);
-          DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap.get(columnName);
+          DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap
+              .get(columnName);
           if (invertedIndexCreator != null) {
             // if inverted index enabled during segment creation,
             // then store dictID -> docID mapping in inverted index
@@ -423,7 +473,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
           if (textIndexCreator != null && !shouldStoreRawValueForTextIndex(columnName)) {
             // for text index on raw columns, check the config to determine if actual raw value should
             // be stored or not
-            columnValueToIndex = _columnProperties.get(columnName).get(FieldConfig.TEXT_INDEX_RAW_VALUE);
+            columnValueToIndex = _columnProperties.get(columnName)
+                .get(FieldConfig.TEXT_INDEX_RAW_VALUE);
             if (columnValueToIndex == null) {
               columnValueToIndex = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE;
             }
@@ -452,12 +503,120 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
           }
         }
       } else {
-        // MV column (always dictionary encoded)
-        int[] dictIds = dictionaryCreator.indexOfMV(columnValueToIndex);
-        forwardIndexCreator.putDictIdMV(dictIds);
-        DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap.get(columnName);
-        if (invertedIndexCreator != null) {
-          invertedIndexCreator.add(dictIds, dictIds.length);
+        if (dictionaryCreator != null) {
+          //dictionary encoded
+          int[] dictIds = dictionaryCreator.indexOfMV(columnValueToIndex);
+          forwardIndexCreator.putDictIdMV(dictIds);
+          DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap
+              .get(columnName);
+          if (invertedIndexCreator != null) {
+            invertedIndexCreator.add(dictIds, dictIds.length);
+          }
+        } else {
+          // for text index on raw columns, check the config to determine if actual raw value should
+          // be stored or not
+          if (textIndexCreator != null && !shouldStoreRawValueForTextIndex(columnName)) {
+            Object value = _columnProperties.get(columnName)
+                .get(FieldConfig.TEXT_INDEX_RAW_VALUE);
+            if (value == null) {
+              value = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE;
+            }
+            if (forwardIndexCreator.getValueType().getStoredType() == DataType.STRING) {
+              value = String.valueOf(value);
+              int length = ((String[]) columnValueToIndex).length;
+              columnValueToIndex = new String[length];
+              Arrays.fill((String[]) columnValueToIndex, value);
+            } else if (forwardIndexCreator.getValueType().getStoredType() == DataType.BYTES) {
+              int length = ((byte[][]) columnValueToIndex).length;
+              columnValueToIndex = new byte[length][];
+              Arrays.fill((byte[][]) columnValueToIndex, String.valueOf(value).getBytes());
+            } else {
+              throw new RuntimeException(
+                  "Text Index is only supported for STRING and BYTES stored type");
+            }
+          }
+          switch (forwardIndexCreator.getValueType()) {
+            case INT:
+              if (columnValueToIndex instanceof int[]) {
+                forwardIndexCreator.putIntMV((int[]) columnValueToIndex);
+              } else if (columnValueToIndex instanceof Object[]) {
+                int[] array = new int[((Object[]) columnValueToIndex).length];
+                for (int i = 0; i < array.length; i++) {
+                  array[i] = (Integer) ((Object[]) columnValueToIndex)[i];
+                }
+                forwardIndexCreator.putIntMV(array);
+              } else {
+                //TODO: is this possible?
+              }
+              break;
+            case LONG:
+              if (columnValueToIndex instanceof long[]) {
+                forwardIndexCreator.putLongMV((long[]) columnValueToIndex);
+              } else if (columnValueToIndex instanceof Object[]) {
+                long[] array = new long[((Object[]) columnValueToIndex).length];
+                for (int i = 0; i < array.length; i++) {
+                  array[i] = (Long) ((Object[]) columnValueToIndex)[i];
+                }
+                forwardIndexCreator.putLongMV(array);
+              } else {
+                //TODO: is this possible?
+              }
+              break;
+            case FLOAT:
+              if (columnValueToIndex instanceof float[]) {
+                forwardIndexCreator.putFloatMV((float[]) columnValueToIndex);
+              } else if (columnValueToIndex instanceof Object[]) {
+                float[] array = new float[((Object[]) columnValueToIndex).length];
+                for (int i = 0; i < array.length; i++) {
+                  array[i] = (Float) ((Object[]) columnValueToIndex)[i];
+                }
+                forwardIndexCreator.putFloatMV(array);
+              } else {
+                //TODO: is this possible?
+              }
+              break;
+            case DOUBLE:
+              if (columnValueToIndex instanceof double[]) {
+                forwardIndexCreator.putDoubleMV((double[]) columnValueToIndex);
+              } else if (columnValueToIndex instanceof Object[]) {
+                double[] array = new double[((Object[]) columnValueToIndex).length];
+                for (int i = 0; i < array.length; i++) {
+                  array[i] = (Double) ((Object[]) columnValueToIndex)[i];
+                }
+                forwardIndexCreator.putDoubleMV(array);
+              } else {
+                //TODO: is this possible?
+              }
+              break;
+            case STRING:
+              if (columnValueToIndex instanceof String[]) {
+                forwardIndexCreator.putStringMV((String[]) columnValueToIndex);
+              } else if (columnValueToIndex instanceof Object[]) {
+                String[] array = new String[((Object[]) columnValueToIndex).length];
+                for (int i = 0; i < array.length; i++) {
+                  array[i] = (String) ((Object[]) columnValueToIndex)[i];
+                }
+                forwardIndexCreator.putStringMV(array);
+              } else {
+                //TODO: is this possible?
+              }
+              break;
+            case BYTES:
+              if (columnValueToIndex instanceof byte[][]) {
+                forwardIndexCreator.putBytesMV((byte[][]) columnValueToIndex);
+              } else if (columnValueToIndex instanceof Object[]) {
+                byte[][] array = new byte[((Object[]) columnValueToIndex).length][];
+                for (int i = 0; i < array.length; i++) {
+                  array[i] = (byte[]) ((Object[]) columnValueToIndex)[i];
+                }
+                forwardIndexCreator.putBytesMV(array);
+              } else {
+                //TODO: is this possible?
+              }
+              break;
+            default:
+              throw new IllegalStateException();
+          }
         }
       }
 
@@ -468,6 +627,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
         }
       }
     }
+
     _docIdCounter++;
   }
 
@@ -491,7 +651,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
   @Override
   public void seal()
       throws ConfigurationException, IOException {
-    for (DictionaryBasedInvertedIndexCreator invertedIndexCreator : _invertedIndexCreatorMap.values()) {
+    for (DictionaryBasedInvertedIndexCreator invertedIndexCreator : _invertedIndexCreatorMap
+        .values()) {
       invertedIndexCreator.seal();
     }
     for (TextIndexCreator textIndexCreator : _textIndexCreatorMap.values()) {
@@ -515,10 +676,12 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
   private void writeMetadata()
       throws ConfigurationException {
     PropertiesConfiguration properties =
-        new PropertiesConfiguration(new File(_indexDir, V1Constants.MetadataKeys.METADATA_FILE_NAME));
+        new PropertiesConfiguration(
+            new File(_indexDir, V1Constants.MetadataKeys.METADATA_FILE_NAME));
 
     properties.setProperty(SEGMENT_CREATOR_VERSION, _config.getCreatorVersion());
-    properties.setProperty(SEGMENT_PADDING_CHARACTER, String.valueOf(V1Constants.Str.DEFAULT_STRING_PAD_CHAR));
+    properties.setProperty(SEGMENT_PADDING_CHARACTER,
+        String.valueOf(V1Constants.Str.DEFAULT_STRING_PAD_CHAR));
     properties.setProperty(SEGMENT_NAME, _segmentName);
     properties.setProperty(TABLE_NAME, _config.getTableName());
     properties.setProperty(DIMENSIONS, _config.getDimensions());
@@ -530,7 +693,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
 
     // Write time related metadata (start time, end time, time unit)
     if (timeColumnName != null) {
-      ColumnIndexCreationInfo timeColumnIndexCreationInfo = _indexCreationInfoMap.get(timeColumnName);
+      ColumnIndexCreationInfo timeColumnIndexCreationInfo = _indexCreationInfoMap
+          .get(timeColumnName);
       if (timeColumnIndexCreationInfo != null) {
         long startTime;
         long endTime;
@@ -548,7 +712,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
 
             if (_config.getTimeColumnType() == SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE) {
               // For TimeColumnType.SIMPLE_DATE_FORMAT, convert time value into millis since epoch
-              DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern(_config.getSimpleDateFormat());
+              DateTimeFormatter dateTimeFormatter = DateTimeFormat
+                  .forPattern(_config.getSimpleDateFormat());
               startTime = dateTimeFormatter.parseMillis(startTimeStr);
               endTime = dateTimeFormatter.parseMillis(endTimeStr);
               timeUnit = TimeUnit.MILLISECONDS;
@@ -575,10 +740,12 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
 
         if (!_config.isSkipTimeValueCheck()) {
           Interval timeInterval =
-              new Interval(timeUnit.toMillis(startTime), timeUnit.toMillis(endTime), DateTimeZone.UTC);
+              new Interval(timeUnit.toMillis(startTime), timeUnit.toMillis(endTime),
+                  DateTimeZone.UTC);
           Preconditions.checkState(TimeUtils.isValidTimeInterval(timeInterval),
               "Invalid segment start/end time: %s (in millis: %s/%s) for time column: %s, must be between: %s",
-              timeInterval, timeInterval.getStartMillis(), timeInterval.getEndMillis(), timeColumnName,
+              timeInterval, timeInterval.getStartMillis(), timeInterval.getEndMillis(),
+              timeColumnName,
               TimeUtils.VALID_TIME_INTERVAL);
         }
 
@@ -596,7 +763,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
       String column = entry.getKey();
       ColumnIndexCreationInfo columnIndexCreationInfo = entry.getValue();
       SegmentDictionaryCreator dictionaryCreator = _dictionaryCreatorMap.get(column);
-      int dictionaryElementSize = (dictionaryCreator != null) ? dictionaryCreator.getNumBytesPerEntry() : 0;
+      int dictionaryElementSize =
+          (dictionaryCreator != null) ? dictionaryCreator.getNumBytesPerEntry() : 0;
 
       // TODO: after fixing the server-side dependency on HAS_INVERTED_INDEX and deployed, set HAS_INVERTED_INDEX
       //  properly
@@ -620,17 +788,24 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
 
       boolean hasJsonIndex = _jsonIndexCreatorMap.containsKey(column);
 
-      addColumnMetadataInfo(properties, column, columnIndexCreationInfo, _totalDocs, _schema.getFieldSpecFor(column),
-          _dictionaryCreatorMap.containsKey(column), dictionaryElementSize, hasInvertedIndex, textIndexType,
+      addColumnMetadataInfo(properties, column, columnIndexCreationInfo, _totalDocs,
+          _schema.getFieldSpecFor(column),
+          _dictionaryCreatorMap.containsKey(column), dictionaryElementSize, hasInvertedIndex,
+          textIndexType,
           hasFSTIndex, hasJsonIndex);
     }
 
     properties.save();
+//    StringWriter sw = new StringWriter();
+//    properties.save(sw);
+//    System.out.println(sw.toString());
   }
 
   public static void addColumnMetadataInfo(PropertiesConfiguration properties, String column,
-      ColumnIndexCreationInfo columnIndexCreationInfo, int totalDocs, FieldSpec fieldSpec, boolean hasDictionary,
-      int dictionaryElementSize, boolean hasInvertedIndex, TextIndexType textIndexType, boolean hasFSTIndex,
+      ColumnIndexCreationInfo columnIndexCreationInfo, int totalDocs, FieldSpec fieldSpec,
+      boolean hasDictionary,
+      int dictionaryElementSize, boolean hasInvertedIndex, TextIndexType textIndexType,
+      boolean hasFSTIndex,
       boolean hasJsonIndex) {
     int cardinality = columnIndexCreationInfo.getDistinctValueCount();
     properties.setProperty(getKeyFor(column, CARDINALITY), String.valueOf(cardinality));
@@ -639,35 +814,44 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
     properties.setProperty(getKeyFor(column, DATA_TYPE), String.valueOf(dataType));
     properties.setProperty(getKeyFor(column, BITS_PER_ELEMENT),
         String.valueOf(PinotDataBitSet.getNumBitsPerValue(cardinality - 1)));
-    properties.setProperty(getKeyFor(column, DICTIONARY_ELEMENT_SIZE), String.valueOf(dictionaryElementSize));
-    properties.setProperty(getKeyFor(column, COLUMN_TYPE), String.valueOf(fieldSpec.getFieldType()));
-    properties.setProperty(getKeyFor(column, IS_SORTED), String.valueOf(columnIndexCreationInfo.isSorted()));
-    properties.setProperty(getKeyFor(column, HAS_NULL_VALUE), String.valueOf(columnIndexCreationInfo.hasNulls()));
+    properties.setProperty(getKeyFor(column, DICTIONARY_ELEMENT_SIZE),
+        String.valueOf(dictionaryElementSize));
+    properties
+        .setProperty(getKeyFor(column, COLUMN_TYPE), String.valueOf(fieldSpec.getFieldType()));
+    properties.setProperty(getKeyFor(column, IS_SORTED),
+        String.valueOf(columnIndexCreationInfo.isSorted()));
+    properties.setProperty(getKeyFor(column, HAS_NULL_VALUE),
+        String.valueOf(columnIndexCreationInfo.hasNulls()));
     properties.setProperty(getKeyFor(column, HAS_DICTIONARY), String.valueOf(hasDictionary));
     properties.setProperty(getKeyFor(column, TEXT_INDEX_TYPE), textIndexType.name());
     properties.setProperty(getKeyFor(column, HAS_INVERTED_INDEX), String.valueOf(hasInvertedIndex));
     properties.setProperty(getKeyFor(column, HAS_FST_INDEX), String.valueOf(hasFSTIndex));
     properties.setProperty(getKeyFor(column, HAS_JSON_INDEX), String.valueOf(hasJsonIndex));
-    properties.setProperty(getKeyFor(column, IS_SINGLE_VALUED), String.valueOf(fieldSpec.isSingleValueField()));
+    properties.setProperty(getKeyFor(column, IS_SINGLE_VALUED),
+        String.valueOf(fieldSpec.isSingleValueField()));
     properties.setProperty(getKeyFor(column, MAX_MULTI_VALUE_ELEMENTS),
         String.valueOf(columnIndexCreationInfo.getMaxNumberOfMultiValueElements()));
     properties.setProperty(getKeyFor(column, TOTAL_NUMBER_OF_ENTRIES),
         String.valueOf(columnIndexCreationInfo.getTotalNumberOfEntries()));
     properties
-        .setProperty(getKeyFor(column, IS_AUTO_GENERATED), String.valueOf(columnIndexCreationInfo.isAutoGenerated()));
+        .setProperty(getKeyFor(column, IS_AUTO_GENERATED),
+            String.valueOf(columnIndexCreationInfo.isAutoGenerated()));
 
     PartitionFunction partitionFunction = columnIndexCreationInfo.getPartitionFunction();
     if (partitionFunction != null) {
       properties.setProperty(getKeyFor(column, PARTITION_FUNCTION), partitionFunction.toString());
-      properties.setProperty(getKeyFor(column, NUM_PARTITIONS), columnIndexCreationInfo.getNumPartitions());
-      properties.setProperty(getKeyFor(column, PARTITION_VALUES), columnIndexCreationInfo.getPartitions());
+      properties.setProperty(getKeyFor(column, NUM_PARTITIONS),
+          columnIndexCreationInfo.getNumPartitions());
+      properties.setProperty(getKeyFor(column, PARTITION_VALUES),
+          columnIndexCreationInfo.getPartitions());
     }
 
     // datetime field
     if (fieldSpec.getFieldType().equals(FieldType.DATE_TIME)) {
       DateTimeFieldSpec dateTimeFieldSpec = (DateTimeFieldSpec) fieldSpec;
       properties.setProperty(getKeyFor(column, DATETIME_FORMAT), dateTimeFieldSpec.getFormat());
-      properties.setProperty(getKeyFor(column, DATETIME_GRANULARITY), dateTimeFieldSpec.getGranularity());
+      properties
+          .setProperty(getKeyFor(column, DATETIME_GRANULARITY), dateTimeFieldSpec.getGranularity());
     }
 
     // NOTE: Min/max could be null for real-time aggregate metrics.
@@ -685,7 +869,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
     }
   }
 
-  public static void addColumnMinMaxValueInfo(PropertiesConfiguration properties, String column, String minValue,
+  public static void addColumnMinMaxValueInfo(PropertiesConfiguration properties, String column,
+      String minValue,
       String maxValue) {
     if (isValidPropertyValue(minValue)) {
       properties.setProperty(getKeyFor(column, MIN_VALUE), minValue);
@@ -699,9 +884,9 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
    * Helper method to check whether the given value is a valid property value.
    * <p>Value is invalid iff:
    * <ul>
-   *   <li>It contains more than 512 characters</li>
-   *   <li>It contains leading/trailing whitespace</li>
-   *   <li>It contains list separator (',')</li>
+   * <li>It contains more than 512 characters</li>
+   * <li>It contains leading/trailing whitespace</li>
+   * <li>It contains list separator (',')</li>
    * </ul>
    */
   @VisibleForTesting
@@ -713,7 +898,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
     if (length > METADATA_PROPERTY_LENGTH_LIMIT) {
       return false;
     }
-    if (Character.isWhitespace(value.charAt(0)) || Character.isWhitespace(value.charAt(length - 1))) {
+    if (Character.isWhitespace(value.charAt(0)) || Character
+        .isWhitespace(value.charAt(length - 1))) {
       return false;
     }
     return value.indexOf(',') == -1;
@@ -731,13 +917,15 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
    * @param column Column name
    * @param totalDocs Total number of documents to index
    * @param lengthOfLongestEntry Length of longest entry
-   * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive the number of rows per chunk
+   * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive the number of rows
+   *     per chunk
    * @param writerVersion version to use for the raw index writer
    * @return raw index creator
-   * @throws IOException
    */
-  public static ForwardIndexCreator getRawIndexCreatorForColumn(File file, ChunkCompressionType compressionType,
-      String column, DataType dataType, int totalDocs, int lengthOfLongestEntry, boolean deriveNumDocsPerChunk,
+  public static ForwardIndexCreator getRawIndexCreatorForSVColumn(File file,
+      ChunkCompressionType compressionType,
+      String column, DataType dataType, int totalDocs, int lengthOfLongestEntry,
+      boolean deriveNumDocsPerChunk,
       int writerVersion)
       throws IOException {
     switch (dataType.getStoredType()) {
@@ -745,14 +933,56 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
       case LONG:
       case FLOAT:
       case DOUBLE:
-        return new SingleValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs, dataType,
+        return new SingleValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs,
+            dataType,
             writerVersion);
       case STRING:
       case BYTES:
-        return new SingleValueVarByteRawIndexCreator(file, compressionType, column, totalDocs, dataType,
+        return new SingleValueVarByteRawIndexCreator(file, compressionType, column, totalDocs,
+            dataType,
             lengthOfLongestEntry, deriveNumDocsPerChunk, writerVersion);
       default:
-        throw new UnsupportedOperationException("Data type not supported for raw indexing: " + dataType);
+        throw new UnsupportedOperationException(
+            "Data type not supported for raw indexing: " + dataType);
+    }
+  }
+
+  /**
+   * Helper method to build the raw index creator for the column.
+   * Assumes that column to be indexed is single valued.
+   *
+   * @param file Output index file
+   * @param column Column name
+   * @param totalDocs Total number of documents to index
+   * @param lengthOfLongestEntry Length of longest entry
+   * @param deriveNumDocsPerChunk true if varbyte writer should auto-derive the number of rows
+   *     per chunk
+   * @param writerVersion version to use for the raw index writer
+   * @return raw index creator
+   */
+  public static ForwardIndexCreator getRawIndexCreatorForMVColumn(File file,
+      ChunkCompressionType compressionType,
+      String column, DataType dataType, final int totalDocs,
+      int lengthOfLongestEntry,
+      final int maxNumberOfMultiValueElements, boolean deriveNumDocsPerChunk,
+      int writerVersion)
+      throws IOException {
+    switch (dataType.getStoredType()) {
+      case INT:
+      case LONG:
+      case FLOAT:
+      case DOUBLE:
+        return new MultiValueFixedByteRawIndexCreator(file, compressionType, column, totalDocs,
+            dataType, dataType.getStoredType().size(), maxNumberOfMultiValueElements,
+            deriveNumDocsPerChunk, writerVersion);
+      case STRING:
+      case BYTES:
+        return new MultiValueVarByteRawIndexCreator(file, compressionType, column, totalDocs,
+            dataType, lengthOfLongestEntry, maxNumberOfMultiValueElements, deriveNumDocsPerChunk,
+            writerVersion);
+      default:
+        throw new UnsupportedOperationException(
+            "Data type not supported for raw indexing: " + dataType);
     }
   }
 
@@ -760,8 +990,10 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
   public void close()
       throws IOException {
     FileUtils.close(Iterables
-        .concat(_dictionaryCreatorMap.values(), _forwardIndexCreatorMap.values(), _invertedIndexCreatorMap.values(),
-            _textIndexCreatorMap.values(), _fstIndexCreatorMap.values(), _jsonIndexCreatorMap.values(),
+        .concat(_dictionaryCreatorMap.values(), _forwardIndexCreatorMap.values(),
+            _invertedIndexCreatorMap.values(),
+            _textIndexCreatorMap.values(), _fstIndexCreatorMap.values(),
+            _jsonIndexCreatorMap.values(),
             _h3IndexCreatorMap.values(), _nullValueVectorCreatorMap.values()));
   }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
index d608a65..de11500 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
@@ -26,6 +26,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
 import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriter;
 import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.V1Constants.Indexes;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -51,13 +52,14 @@ public class MultiValueFixedByteRawIndexCreator implements ForwardIndexCreator {
    * @param column Name of column to index
    * @param totalDocs Total number of documents to index
    * @param valueType Type of the values
-   * @param maxLength length of longest entry (in bytes)
    */
   public MultiValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType,
       String column,
-      int totalDocs, DataType valueType, int maxLength)
+      int totalDocs, DataType valueType, final int maxLengthOfEachEntry,
+      final int maxNumberOfMultiValueElements)
       throws IOException {
-    this(baseIndexDir, compressionType, column, totalDocs, valueType, maxLength, false,
+    this(baseIndexDir, compressionType, column, totalDocs, valueType, maxLengthOfEachEntry,
+        maxNumberOfMultiValueElements, false,
         BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
   }
 
@@ -69,23 +71,23 @@ public class MultiValueFixedByteRawIndexCreator implements ForwardIndexCreator {
    * @param column Name of column to index
    * @param totalDocs Total number of documents to index
    * @param valueType Type of the values
-   * @param maxLength length of longest entry (in bytes)
+   * @param maxLengthOfEachEntry length of longest entry (in bytes)
    * @param deriveNumDocsPerChunk true if writer should auto-derive the number of rows per chunk
    * @param writerVersion writer format version
    */
   public MultiValueFixedByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType,
-      String column,
-      int totalDocs, DataType valueType, int maxLength, boolean deriveNumDocsPerChunk,
+      String column, int totalDocs, DataType valueType, final int maxLengthOfEachEntry,
+      final int maxNumberOfMultiValueElements, boolean deriveNumDocsPerChunk,
       int writerVersion)
       throws IOException {
     File file = new File(baseIndexDir,
-        column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
+        column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
     FileUtils.deleteQuietly(file);
+    int totalMaxLength = maxNumberOfMultiValueElements * maxLengthOfEachEntry;
     int numDocsPerChunk =
-        deriveNumDocsPerChunk ? getNumDocsPerChunk(maxLength) : DEFAULT_NUM_DOCS_PER_CHUNK;
+        deriveNumDocsPerChunk ? getNumDocsPerChunk(totalMaxLength) : DEFAULT_NUM_DOCS_PER_CHUNK;
     _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs,
-        numDocsPerChunk, maxLength,
-        writerVersion);
+        numDocsPerChunk, totalMaxLength, writerVersion);
     _valueType = valueType;
   }
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
index 465b5f7..9264bde 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueVarByteRawIndexCreator.java
@@ -65,9 +65,8 @@ public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator {
       String column,
       int totalDocs, DataType valueType, int maxTotalContentLength, int maxElements)
       throws IOException {
-    this(baseIndexDir, compressionType, column, totalDocs, valueType, false, maxTotalContentLength,
-        maxElements,
-        BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
+    this(baseIndexDir, compressionType, column, totalDocs, valueType, maxTotalContentLength,
+        maxElements, false, BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
   }
 
   /**
@@ -78,25 +77,24 @@ public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator {
    * @param column Name of column to index
    * @param totalDocs Total number of documents to index
    * @param valueType Type of the values
-   * @param deriveNumDocsPerChunk true if writer should auto-derive the number of rows per chunk
-   * @param maxTotalContentLength max total content length
+   * @param maxLength max length for each entry
    * @param maxElements max number of elements
+   * @param deriveNumDocsPerChunk true if writer should auto-derive the number of rows per
+   *     chunk
    * @param writerVersion writer format version
    */
   public MultiValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType compressionType,
-      String column,
-      int totalDocs, DataType valueType, boolean deriveNumDocsPerChunk, int maxTotalContentLength,
-      int maxElements,
-      int writerVersion)
+      String column, int totalDocs, DataType valueType,
+      int maxLength, int maxElements, boolean deriveNumDocsPerChunk, int writerVersion)
       throws IOException {
     //we will prepend the actual content with numElements and length array containing length of each element
-    int maxLength = Integer.BYTES + maxElements * Integer.BYTES + maxTotalContentLength;
+    int totalMaxLength = Integer.BYTES + maxElements * Integer.BYTES + maxLength * maxElements;
     File file = new File(baseIndexDir,
         column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
     int numDocsPerChunk =
-        deriveNumDocsPerChunk ? getNumDocsPerChunk(maxLength) : DEFAULT_NUM_DOCS_PER_CHUNK;
+        deriveNumDocsPerChunk ? getNumDocsPerChunk(totalMaxLength) : DEFAULT_NUM_DOCS_PER_CHUNK;
     _indexWriter = new VarByteChunkSVForwardIndexWriter(file, compressionType, totalDocs,
-        numDocsPerChunk, maxLength,
+        numDocsPerChunk, totalMaxLength,
         writerVersion);
     _valueType = valueType;
   }
@@ -151,6 +149,7 @@ public class MultiValueVarByteRawIndexCreator implements ForwardIndexCreator {
 
   @Override
   public void putBytesMV(final byte[][] values) {
+
     int totalBytes = 0;
     for (int i = 0; i < values.length; i++) {
       int length = values[i].length;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java
index a0cfd66..908be20 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -21,14 +21,15 @@ package org.apache.pinot.segment.local.segment.creator.impl.stats;
 import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
 import java.util.Arrays;
 import java.util.Set;
+import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
 import org.apache.pinot.spi.utils.ByteArray;
 
-
 /**
  * Extension of {@link AbstractColumnStatisticsCollector} for byte[] column type.
  */
 public class BytesColumnPredIndexStatsCollector extends AbstractColumnStatisticsCollector {
+
   private final Set<ByteArray> _values = new ObjectOpenHashSet<>(INITIAL_HASH_SET_SIZE);
 
   private int _minLength = Integer.MAX_VALUE;
@@ -36,22 +37,36 @@ public class BytesColumnPredIndexStatsCollector extends AbstractColumnStatistics
   private ByteArray[] _sortedValues;
   private boolean _sealed = false;
 
-  public BytesColumnPredIndexStatsCollector(String column, StatsCollectorConfig statsCollectorConfig) {
+  public BytesColumnPredIndexStatsCollector(String column,
+      StatsCollectorConfig statsCollectorConfig) {
     super(column, statsCollectorConfig);
   }
 
   @Override
   public void collect(Object entry) {
-    ByteArray value = new ByteArray((byte[]) entry);
-    addressSorted(value);
-    updatePartition(value);
-    _values.add(value);
-
-    int length = value.length();
-    _minLength = Math.min(_minLength, length);
-    _maxLength = Math.max(_maxLength, length);
-
-    _totalNumberOfEntries++;
+    if (entry instanceof Object[]) {
+      Object[] values = (Object[]) entry;
+      for (Object obj : values) {
+        ByteArray value = new ByteArray((byte[]) obj);
+        _values.add(value);
+        int length = value.length();
+        _minLength = Math.min(_minLength, length);
+        _maxLength = Math.max(_maxLength, length);
+      }
+      _maxNumberOfMultiValues = Math.max(_maxNumberOfMultiValues, values.length);
+      updateTotalNumberOfEntries(values);
+    } else {
+      ByteArray value = new ByteArray((byte[]) entry);
+      addressSorted(value);
+      updatePartition(value);
+      _values.add(value);
+
+      int length = value.length();
+      _minLength = Math.min(_minLength, length);
+      _maxLength = Math.max(_maxLength, length);
+
+      _totalNumberOfEntries++;
+    }
   }
 
   @Override
@@ -59,7 +74,8 @@ public class BytesColumnPredIndexStatsCollector extends AbstractColumnStatistics
     if (_sealed) {
       return _sortedValues[0];
     }
-    throw new IllegalStateException("you must seal the collector first before asking for min value");
+    throw new IllegalStateException(
+        "you must seal the collector first before asking for min value");
   }
 
   @Override
@@ -67,7 +83,8 @@ public class BytesColumnPredIndexStatsCollector extends AbstractColumnStatistics
     if (_sealed) {
       return _sortedValues[_sortedValues.length - 1];
     }
-    throw new IllegalStateException("you must seal the collector first before asking for max value");
+    throw new IllegalStateException(
+        "you must seal the collector first before asking for max value");
   }
 
   @Override
@@ -75,7 +92,8 @@ public class BytesColumnPredIndexStatsCollector extends AbstractColumnStatistics
     if (_sealed) {
       return _sortedValues;
     }
-    throw new IllegalStateException("you must seal the collector first before asking for unique values set");
+    throw new IllegalStateException(
+        "you must seal the collector first before asking for unique values set");
   }
 
   @Override
@@ -88,7 +106,8 @@ public class BytesColumnPredIndexStatsCollector extends AbstractColumnStatistics
     if (_sealed) {
       return _maxLength;
     }
-    throw new IllegalStateException("you must seal the collector first before asking for longest value");
+    throw new IllegalStateException(
+        "you must seal the collector first before asking for longest value");
   }
 
   @Override
@@ -96,7 +115,8 @@ public class BytesColumnPredIndexStatsCollector extends AbstractColumnStatistics
     if (_sealed) {
       return _sortedValues.length;
     }
-    throw new IllegalStateException("you must seal the collector first before asking for cardinality");
+    throw new IllegalStateException(
+        "you must seal the collector first before asking for cardinality");
   }
 
   @Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java
index aba55ca..71e62cb 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java
@@ -30,6 +30,7 @@ import java.util.Set;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.V1Constants.Indexes;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.segment.spi.store.ColumnIndexDirectory;
@@ -167,7 +168,11 @@ class FilePerIndexDirectory extends ColumnIndexDirectory {
             fileExtension = V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION;
           }
         } else {
-          fileExtension = V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION;
+          if (!columnMetadata.hasDictionary()) {
+            fileExtension = V1Constants.Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION;
+          } else {
+            fileExtension = V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION;
+          }
         }
         break;
       case INVERTED_INDEX:
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
index 373c3a9..e98a185 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueVarByteRawIndexCreatorTest.java
@@ -39,7 +39,7 @@ public class MultiValueVarByteRawIndexCreatorTest {
   }
 
   @Test
-  public void testMV() throws IOException {
+  public void testMVString() throws IOException {
     String column = "testCol";
     int numDocs = 1000;
     int maxElements = 50;
@@ -78,4 +78,47 @@ public class MultiValueVarByteRawIndexCreatorTest {
       Assert.assertEquals(inputs.get(i), readValue);
     }
   }
+
+  @Test
+  public void testMVBytes() throws IOException {
+    String column = "testCol";
+    int numDocs = 1000;
+    int maxElements = 50;
+    int maxTotalLength = 500;
+    File file = new File(OUTPUT_DIR, column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
+    file.delete();
+    MultiValueVarByteRawIndexCreator creator = new MultiValueVarByteRawIndexCreator(
+        new File(OUTPUT_DIR), ChunkCompressionType.SNAPPY, column, numDocs, DataType.BYTES,
+        maxTotalLength, maxElements);
+    List<byte[][]> inputs = new ArrayList<>();
+    Random random = new Random();
+    for (int i = 0; i < numDocs; i++) {
+      //int length = 1;
+      int length = random.nextInt(10);
+      byte[][] values = new byte[length][];
+      for (int j = 0; j < length; j++) {
+        char[] value = new char[length];
+        Arrays.fill(value, 'a');
+        values[j] = new String(value).getBytes();
+      }
+      inputs.add(values);
+      creator.putBytesMV(values);
+    }
+    creator.close();
+
+    //read
+    final PinotDataBuffer buffer = PinotDataBuffer
+        .mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, "");
+    VarByteChunkMVForwardIndexReader reader = new VarByteChunkMVForwardIndexReader(buffer,
+        DataType.BYTES);
+    final ChunkReaderContext context = reader.createContext();
+    byte[][] values = new byte[maxElements][];
+    for (int i = 0; i < numDocs; i++) {
+      int length = reader.getBytesMV(i, values, context);
+      byte[][] readValue = Arrays.copyOf(values, length);
+      for (int j = 0; j < length; j++) {
+        Assert.assertTrue(Arrays.equals(inputs.get(i)[j], readValue[j]));
+      }
+    }
+  }
 }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
index 9f515e8..b9f0f15 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RawIndexCreatorTest.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *   http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.segment.index.creator;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -32,6 +33,7 @@ import org.apache.pinot.segment.local.loader.LocalSegmentDirectoryLoader;
 import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader;
 import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
+import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
 import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
 import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
@@ -55,15 +57,16 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-
 /**
  * Class for testing Raw index creators.
  */
 public class RawIndexCreatorTest {
+
   private static final int NUM_ROWS = 10009;
   private static final int MAX_STRING_LENGTH = 101;
 
-  private static final String SEGMENT_DIR_NAME = System.getProperty("java.io.tmpdir") + File.separator + "fwdIndexTest";
+  private static final String SEGMENT_DIR_NAME =
+      System.getProperty("java.io.tmpdir") + File.separator + "fwdIndexTest";
   private static final String SEGMENT_NAME = "testSegment";
 
   private static final String INT_COLUMN = "intColumn";
@@ -71,6 +74,12 @@ public class RawIndexCreatorTest {
   private static final String FLOAT_COLUMN = "floatColumn";
   private static final String DOUBLE_COLUMN = "doubleColumn";
   private static final String STRING_COLUMN = "stringColumn";
+  private static final String INT_MV_COLUMN = "intMVColumn";
+  private static final String LONG_MV_COLUMN = "longMVColumn";
+  private static final String FLOAT_MV_COLUMN = "floatMVColumn";
+  private static final String DOUBLE_MV_COLUMN = "doubleMVColumn";
+  private static final String STRING_MV_COLUMN = "stringMVColumn";
+  private static final String BYTES_MV_COLUMN = "bytesMVColumn";
 
   Random _random;
   private RecordReader _recordReader;
@@ -79,8 +88,6 @@ public class RawIndexCreatorTest {
 
   /**
    * Setup to build a segment with raw indexes (no-dictionary) of various data types.
-   *
-   * @throws Exception
    */
   @BeforeClass
   public void setup()
@@ -91,8 +98,15 @@ public class RawIndexCreatorTest {
     schema.addField(new DimensionFieldSpec(FLOAT_COLUMN, DataType.FLOAT, true));
     schema.addField(new DimensionFieldSpec(DOUBLE_COLUMN, DataType.DOUBLE, true));
     schema.addField(new DimensionFieldSpec(STRING_COLUMN, DataType.STRING, true));
+    schema.addField(new DimensionFieldSpec(INT_MV_COLUMN, DataType.INT, false));
+    schema.addField(new DimensionFieldSpec(LONG_MV_COLUMN, DataType.LONG, false));
+    schema.addField(new DimensionFieldSpec(FLOAT_MV_COLUMN, DataType.FLOAT, false));
+    schema.addField(new DimensionFieldSpec(DOUBLE_MV_COLUMN, DataType.DOUBLE, false));
+    schema.addField(new DimensionFieldSpec(STRING_MV_COLUMN, DataType.STRING, false));
+    schema.addField(new DimensionFieldSpec(BYTES_MV_COLUMN, DataType.BYTES, false));
 
-    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("test").build();
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("test")
+        .build();
 
     _random = new Random(System.nanoTime());
     _recordReader = buildIndex(tableConfig, schema);
@@ -109,7 +123,6 @@ public class RawIndexCreatorTest {
   /**
    * Test for int raw index creator.
    * Compares values read from the raw index against expected value.
-   * @throws Exception
    */
   @Test
   public void testIntRawIndexCreator()
@@ -120,7 +133,6 @@ public class RawIndexCreatorTest {
   /**
    * Test for long raw index creator.
    * Compares values read from the raw index against expected value.
-   * @throws Exception
    */
   @Test
   public void testLongRawIndexCreator()
@@ -131,7 +143,6 @@ public class RawIndexCreatorTest {
   /**
    * Test for float raw index creator.
    * Compares values read from the raw index against expected value.
-   * @throws Exception
    */
   @Test
   public void testFloatRawIndexCreator()
@@ -142,7 +153,6 @@ public class RawIndexCreatorTest {
   /**
    * Test for double raw index creator.
    * Compares values read from the raw index against expected value.
-   * @throws Exception
    */
   @Test
   public void testDoubleRawIndexCreator()
@@ -153,19 +163,21 @@ public class RawIndexCreatorTest {
   /**
    * Test for string raw index creator.
    * Compares values read from the raw index against expected value.
-   * @throws Exception
    */
   @Test
   public void testStringRawIndexCreator()
       throws Exception {
     PinotDataBuffer indexBuffer = getIndexBufferForColumn(STRING_COLUMN);
-    try (VarByteChunkSVForwardIndexReader rawIndexReader = new VarByteChunkSVForwardIndexReader(indexBuffer,
+    try (VarByteChunkSVForwardIndexReader rawIndexReader = new VarByteChunkSVForwardIndexReader(
+        indexBuffer,
         DataType.STRING);
-        BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = rawIndexReader.createContext()) {
+        BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = rawIndexReader
+            .createContext()) {
       _recordReader.rewind();
       for (int row = 0; row < NUM_ROWS; row++) {
         GenericRow expectedRow = _recordReader.next();
-        Assert.assertEquals(rawIndexReader.getString(row, readerContext), expectedRow.getValue(STRING_COLUMN));
+        Assert.assertEquals(rawIndexReader.getString(row, readerContext),
+            expectedRow.getValue(STRING_COLUMN));
       }
     }
   }
@@ -175,17 +187,88 @@ public class RawIndexCreatorTest {
    *
    * @param column Column for which to perform the test
    * @param dataType Data type of the column
-   * @throws Exception
    */
   private void testFixedLengthRawIndexCreator(String column, DataType dataType)
       throws Exception {
     PinotDataBuffer indexBuffer = getIndexBufferForColumn(column);
-    try (FixedByteChunkSVForwardIndexReader rawIndexReader = new FixedByteChunkSVForwardIndexReader(indexBuffer,
-        dataType); BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = rawIndexReader.createContext()) {
+    try (FixedByteChunkSVForwardIndexReader rawIndexReader = new FixedByteChunkSVForwardIndexReader(
+        indexBuffer,
+        dataType); BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = rawIndexReader
+        .createContext()) {
+      _recordReader.rewind();
+      for (int row = 0; row < NUM_ROWS; row++) {
+        GenericRow expectedRow = _recordReader.next();
+        Assert.assertEquals(readValueFromIndex(rawIndexReader, readerContext, row),
+            expectedRow.getValue(column));
+      }
+    }
+  }
+
+  /**
+   * Test for multi value string raw index creator.
+   * Compares values read from the raw index against expected value.
+   */
+  @Test
+  public void testStringMVRawIndexCreator()
+      throws Exception {
+    PinotDataBuffer indexBuffer = getIndexBufferForColumn(STRING_MV_COLUMN);
+    try (VarByteChunkMVForwardIndexReader rawIndexReader = new VarByteChunkMVForwardIndexReader(
+        indexBuffer,
+        DataType.STRING);
+        BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = rawIndexReader
+            .createContext()) {
       _recordReader.rewind();
+      int maxNumberOfMultiValues = _segmentDirectory.getSegmentMetadata()
+          .getColumnMetadataFor(STRING_MV_COLUMN).getMaxNumberOfMultiValues();
+      final String[] valueBuffer = new String[maxNumberOfMultiValues];
       for (int row = 0; row < NUM_ROWS; row++) {
         GenericRow expectedRow = _recordReader.next();
-        Assert.assertEquals(readValueFromIndex(rawIndexReader, readerContext, row), expectedRow.getValue(column));
+
+        int length = rawIndexReader.getStringMV(row, valueBuffer, readerContext);
+        String[] readValue = Arrays.copyOf(valueBuffer, length);
+        Object[] writtenValue = (Object[]) expectedRow.getValue(STRING_MV_COLUMN);
+        if (writtenValue == null || writtenValue.length == 0) {
+          writtenValue = new String[]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_STRING};
+        }
+        for (int i = 0; i < length; i++) {
+          Object expected = writtenValue[i];
+          Object actual = readValue[i];
+          Assert.assertEquals(expected, actual);
+        }
+      }
+    }
+  }
+
+  /**
+   * Test for multi value string raw index creator.
+   * Compares values read from the raw index against expected value.
+   */
+  @Test
+  public void testBytesMVRawIndexCreator()
+      throws Exception {
+    PinotDataBuffer indexBuffer = getIndexBufferForColumn(BYTES_MV_COLUMN);
+    try (VarByteChunkMVForwardIndexReader rawIndexReader = new VarByteChunkMVForwardIndexReader(
+        indexBuffer, DataType.BYTES);
+        BaseChunkSVForwardIndexReader.ChunkReaderContext readerContext = rawIndexReader
+            .createContext()) {
+      _recordReader.rewind();
+      int maxNumberOfMultiValues = _segmentDirectory.getSegmentMetadata()
+          .getColumnMetadataFor(BYTES_MV_COLUMN).getMaxNumberOfMultiValues();
+      final byte[][] valueBuffer = new byte[maxNumberOfMultiValues][];
+      for (int row = 0; row < NUM_ROWS; row++) {
+        GenericRow expectedRow = _recordReader.next();
+
+        int length = rawIndexReader.getBytesMV(row, valueBuffer, readerContext);
+        byte[][] readValue = Arrays.copyOf(valueBuffer, length);
+        Object[] writtenValue = (Object[]) expectedRow.getValue(BYTES_MV_COLUMN);
+        if (writtenValue == null || writtenValue.length == 0) {
+          writtenValue = new byte[][]{FieldSpec.DEFAULT_DIMENSION_NULL_VALUE_OF_BYTES};
+        }
+        for (int i = 0; i < length; i++) {
+          Object expected = writtenValue[i];
+          Object actual = readValue[i];
+          Assert.assertTrue(Arrays.equals((byte[]) expected, (byte[]) actual));
+        }
       }
     }
   }
@@ -202,10 +285,10 @@ public class RawIndexCreatorTest {
   }
 
   /**
-   * Helper method to build a segment containing a single valued string column with RAW (no-dictionary) index.
+   * Helper method to build a segment containing a single valued string column with RAW
+   * (no-dictionary) index.
    *
    * @return Array of string values for the rows in the generated index.
-   * @throws Exception
    */
   private RecordReader buildIndex(TableConfig tableConfig, Schema schema)
       throws Exception {
@@ -221,9 +304,17 @@ public class RawIndexCreatorTest {
 
       for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
         Object value;
-
-        value = getRandomValue(_random, fieldSpec.getDataType());
-        map.put(fieldSpec.getName(), value);
+        if (fieldSpec.isSingleValueField()) {
+          value = getRandomValue(_random, fieldSpec.getDataType());
+          map.put(fieldSpec.getName(), value);
+        } else {
+          int length = _random.nextInt(50);
+          Object[] values = new Object[length];
+          for (int j = 0; j < length; j++) {
+            values[j] = getRandomValue(_random, fieldSpec.getDataType());
+          }
+          map.put(fieldSpec.getName(), values);
+        }
       }
 
       GenericRow genericRow = new GenericRow();
@@ -262,9 +353,15 @@ public class RawIndexCreatorTest {
         return random.nextDouble();
       case STRING:
         return StringUtil
-            .sanitizeStringValue(RandomStringUtils.random(random.nextInt(MAX_STRING_LENGTH)), Integer.MAX_VALUE);
+            .sanitizeStringValue(RandomStringUtils.random(random.nextInt(MAX_STRING_LENGTH)),
+                Integer.MAX_VALUE);
+      case BYTES:
+        return StringUtil
+            .sanitizeStringValue(RandomStringUtils.random(random.nextInt(MAX_STRING_LENGTH)),
+                Integer.MAX_VALUE).getBytes();
       default:
-        throw new UnsupportedOperationException("Unsupported data type for random value generator: " + dataType);
+        throw new UnsupportedOperationException(
+            "Unsupported data type for random value generator: " + dataType);
     }
   }
 
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
index d3e53141..279ae6d 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
@@ -308,7 +308,7 @@ public class DictionaryToRawIndexConverter {
     int lengthOfLongestEntry = (storedType == DataType.STRING) ? getLengthOfLongestEntry(dictionary) : -1;
 
     try (ForwardIndexCreator rawIndexCreator = SegmentColumnarIndexCreator
-        .getRawIndexCreatorForColumn(newSegment, compressionType, column, storedType, numDocs, lengthOfLongestEntry,
+        .getRawIndexCreatorForSVColumn(newSegment, compressionType, column, storedType, numDocs, lengthOfLongestEntry,
             false, BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
         ForwardIndexReaderContext readerContext = reader.createContext()) {
       switch (storedType) {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org