You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2022/06/29 06:11:28 UTC

[pinot] branch master updated: Add multi-value noDict APIs and their implementation for querying immutable and mutable fixed-width segment types (#8953)

This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4152f60a84 Add multi-value noDict APIs and their implementation for querying immutable and mutable fixed-width segment types (#8953)
4152f60a84 is described below

commit 4152f60a849fe38f57b95600570555a5522c0c4c
Author: Sonam Mandal <so...@linkedin.com>
AuthorDate: Tue Jun 28 23:11:22 2022 -0700

    Add multi-value noDict APIs and their implementation for querying immutable and mutable fixed-width segment types (#8953)
    
    * Add multi-value raw APIs and their implementation for querying immutable and mutable fixed-width segment types
    
    * Address review comments
    
    * Update pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
    
    Co-authored-by: Rong Rong <wa...@gmail.com>
    
    * Fix
    
    Co-authored-by: Rong Rong <wa...@gmail.com>
---
 .../org/apache/pinot/core/common/DataFetcher.java  |  79 +-
 .../apache/pinot/queries/DistinctQueriesTest.java  | 520 ++++++++++++-
 .../pinot/queries/MultiValueRawQueriesTest.java    | 810 +++++++++++++++++++++
 .../mutable/DefaultMutableIndexProvider.java       |  47 +-
 .../indexsegment/mutable/IntermediateSegment.java  |   2 +-
 .../indexsegment/mutable/MutableSegmentImpl.java   | 160 +++-
 .../forward/FixedByteMVMutableForwardIndex.java    |  87 ++-
 .../fwd/MultiValueFixedByteRawIndexCreator.java    |   5 +-
 .../constant/ConstantMVForwardIndexReader.java     |  10 +
 ...eader.java => BaseChunkForwardIndexReader.java} |  31 +-
 .../forward/FixedBitMVForwardIndexReader.java      |  71 ++
 .../FixedByteChunkMVForwardIndexReader.java        |  56 +-
 .../FixedByteChunkSVForwardIndexReader.java        |   4 +-
 .../FixedBytePower2ChunkSVForwardIndexReader.java  |   4 +-
 .../forward/VarByteChunkMVForwardIndexReader.java  |  62 +-
 .../forward/VarByteChunkSVForwardIndexReader.java  |   4 +-
 .../startree/v2/store/StarTreeLoaderUtils.java     |   4 +-
 .../mutable/MutableSegmentImplRawMVTest.java       | 266 +++++++
 .../impl/dictionary/MultiValueDictionaryTest.java  | 169 ++++-
 .../MultiValueFixedByteRawIndexCreatorTest.java    |   3 +-
 .../FixedByteMVMutableForwardIndexTest.java        |  49 +-
 .../spi/index/mutable/MutableForwardIndex.java     | 105 +++
 .../spi/index/reader/ForwardIndexReader.java       | 389 +++++++++-
 23 files changed, 2775 insertions(+), 162 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java b/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
index 11cae5496a..0eff70555b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
@@ -53,6 +53,7 @@ public class DataFetcher {
   //       ChunkReaderContext should be closed explicitly to release the off-heap buffer
   private final Map<String, ColumnValueReader> _columnValueReaderMap;
   private final int[] _reusableMVDictIds;
+  private final int _maxNumValuesPerMVEntry;
 
   /**
    * Constructor for DataFetcher.
@@ -74,6 +75,7 @@ public class DataFetcher {
       }
     }
     _reusableMVDictIds = new int[maxNumValuesPerMVEntry];
+    _maxNumValuesPerMVEntry = maxNumValuesPerMVEntry;
   }
 
   /**
@@ -628,12 +630,15 @@ public class DataFetcher {
 
     void readIntValuesMV(int[] docIds, int length, int[][] valuesBuffer) {
       Tracing.activeRecording().setInputDataType(_dataType, _singleValue);
-      assert _dictionary != null;
-      for (int i = 0; i < length; i++) {
-        int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext());
-        int[] values = new int[numValues];
-        _dictionary.readIntValues(_reusableMVDictIds, numValues, values);
-        valuesBuffer[i] = values;
+      if (_dictionary != null) {
+        for (int i = 0; i < length; i++) {
+          int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext());
+          int[] values = new int[numValues];
+          _dictionary.readIntValues(_reusableMVDictIds, numValues, values);
+          valuesBuffer[i] = values;
+        }
+      } else {
+        _reader.readValuesMV(docIds, length, _maxNumValuesPerMVEntry, valuesBuffer, getReaderContext());
       }
     }
 
@@ -645,12 +650,15 @@ public class DataFetcher {
 
     void readLongValuesMV(int[] docIds, int length, long[][] valuesBuffer) {
       Tracing.activeRecording().setInputDataType(_dataType, _singleValue);
-      assert _dictionary != null;
-      for (int i = 0; i < length; i++) {
-        int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext());
-        long[] values = new long[numValues];
-        _dictionary.readLongValues(_reusableMVDictIds, numValues, values);
-        valuesBuffer[i] = values;
+      if (_dictionary != null) {
+        for (int i = 0; i < length; i++) {
+          int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext());
+          long[] values = new long[numValues];
+          _dictionary.readLongValues(_reusableMVDictIds, numValues, values);
+          valuesBuffer[i] = values;
+        }
+      } else {
+        _reader.readValuesMV(docIds, length, _maxNumValuesPerMVEntry, valuesBuffer, getReaderContext());
       }
     }
 
@@ -662,12 +670,15 @@ public class DataFetcher {
 
     void readFloatValuesMV(int[] docIds, int length, float[][] valuesBuffer) {
       Tracing.activeRecording().setInputDataType(_dataType, _singleValue);
-      assert _dictionary != null;
-      for (int i = 0; i < length; i++) {
-        int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext());
-        float[] values = new float[numValues];
-        _dictionary.readFloatValues(_reusableMVDictIds, numValues, values);
-        valuesBuffer[i] = values;
+      if (_dictionary != null) {
+        for (int i = 0; i < length; i++) {
+          int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext());
+          float[] values = new float[numValues];
+          _dictionary.readFloatValues(_reusableMVDictIds, numValues, values);
+          valuesBuffer[i] = values;
+        }
+      } else {
+        _reader.readValuesMV(docIds, length, _maxNumValuesPerMVEntry, valuesBuffer, getReaderContext());
       }
     }
 
@@ -679,12 +690,15 @@ public class DataFetcher {
 
     void readDoubleValuesMV(int[] docIds, int length, double[][] valuesBuffer) {
       Tracing.activeRecording().setInputDataType(_dataType, _singleValue);
-      assert _dictionary != null;
-      for (int i = 0; i < length; i++) {
-        int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext());
-        double[] values = new double[numValues];
-        _dictionary.readDoubleValues(_reusableMVDictIds, numValues, values);
-        valuesBuffer[i] = values;
+      if (_dictionary != null) {
+        for (int i = 0; i < length; i++) {
+          int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext());
+          double[] values = new double[numValues];
+          _dictionary.readDoubleValues(_reusableMVDictIds, numValues, values);
+          valuesBuffer[i] = values;
+        }
+      } else {
+        _reader.readValuesMV(docIds, length, _maxNumValuesPerMVEntry, valuesBuffer, getReaderContext());
       }
     }
 
@@ -696,12 +710,15 @@ public class DataFetcher {
 
     void readStringValuesMV(int[] docIds, int length, String[][] valuesBuffer) {
       Tracing.activeRecording().setInputDataType(_dataType, _singleValue);
-      assert _dictionary != null;
-      for (int i = 0; i < length; i++) {
-        int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext());
-        String[] values = new String[numValues];
-        _dictionary.readStringValues(_reusableMVDictIds, numValues, values);
-        valuesBuffer[i] = values;
+      if (_dictionary != null) {
+        for (int i = 0; i < length; i++) {
+          int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext());
+          String[] values = new String[numValues];
+          _dictionary.readStringValues(_reusableMVDictIds, numValues, values);
+          valuesBuffer[i] = values;
+        }
+      } else {
+        _reader.readValuesMV(docIds, length, _maxNumValuesPerMVEntry, valuesBuffer, getReaderContext());
       }
     }
 
@@ -714,7 +731,7 @@ public class DataFetcher {
     public void readNumValuesMV(int[] docIds, int length, int[] numValuesBuffer) {
       Tracing.activeRecording().setInputDataType(_dataType, _singleValue);
       for (int i = 0; i < length; i++) {
-        numValuesBuffer[i] = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext());
+        numValuesBuffer[i] = _reader.getNumValuesMV(docIds[i], getReaderContext());
       }
     }
 
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
index 6e34f36a48..9f17069f69 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
@@ -92,13 +92,11 @@ public class DistinctQueriesTest extends BaseQueriesTest {
   private static final String FLOAT_MV_COLUMN = "floatMVColumn";
   private static final String DOUBLE_MV_COLUMN = "doubleMVColumn";
   private static final String STRING_MV_COLUMN = "stringMVColumn";
-
-  // TODO: Fix raw index for MV column then add tests for them
-//  private static final String RAW_INT_MV_COLUMN = "rawIntMVColumn";
-//  private static final String RAW_LONG_MV_COLUMN = "rawLongMVColumn";
-//  private static final String RAW_FLOAT_MV_COLUMN = "rawFloatMVColumn";
-//  private static final String RAW_DOUBLE_MV_COLUMN = "rawDoubleMVColumn";
-//  private static final String RAW_STRING_MV_COLUMN = "rawStringMVColumn";
+  private static final String RAW_INT_MV_COLUMN = "rawIntMVColumn";
+  private static final String RAW_LONG_MV_COLUMN = "rawLongMVColumn";
+  private static final String RAW_FLOAT_MV_COLUMN = "rawFloatMVColumn";
+  private static final String RAW_DOUBLE_MV_COLUMN = "rawDoubleMVColumn";
+  private static final String RAW_STRING_MV_COLUMN = "rawStringMVColumn";
 
   //@formatter:off
   private static final Schema SCHEMA = new Schema.SchemaBuilder()
@@ -121,19 +119,19 @@ public class DistinctQueriesTest extends BaseQueriesTest {
       .addMultiValueDimension(FLOAT_MV_COLUMN, DataType.FLOAT)
       .addMultiValueDimension(DOUBLE_MV_COLUMN, DataType.DOUBLE)
       .addMultiValueDimension(STRING_MV_COLUMN, DataType.STRING)
-//      .addMultiValueDimension(RAW_INT_MV_COLUMN, DataType.INT)
-//      .addMultiValueDimension(RAW_LONG_MV_COLUMN, DataType.LONG)
-//      .addMultiValueDimension(RAW_FLOAT_MV_COLUMN, DataType.FLOAT)
-//      .addMultiValueDimension(RAW_DOUBLE_MV_COLUMN, DataType.DOUBLE)
-//      .addMultiValueDimension(RAW_STRING_MV_COLUMN, DataType.STRING)
+      .addMultiValueDimension(RAW_INT_MV_COLUMN, DataType.INT)
+      .addMultiValueDimension(RAW_LONG_MV_COLUMN, DataType.LONG)
+      .addMultiValueDimension(RAW_FLOAT_MV_COLUMN, DataType.FLOAT)
+      .addMultiValueDimension(RAW_DOUBLE_MV_COLUMN, DataType.DOUBLE)
+      .addMultiValueDimension(RAW_STRING_MV_COLUMN, DataType.STRING)
       .build();
   //@formatter:on
 
   private static final TableConfig TABLE = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
       .setNoDictionaryColumns(
           Arrays.asList(RAW_INT_COLUMN, RAW_LONG_COLUMN, RAW_FLOAT_COLUMN, RAW_DOUBLE_COLUMN, RAW_BIG_DECIMAL_COLUMN,
-              RAW_STRING_COLUMN, RAW_BYTES_COLUMN/*, RAW_INT_MV_COLUMN, RAW_LONG_MV_COLUMN, RAW_FLOAT_MV_COLUMN,
-              RAW_DOUBLE_MV_COLUMN, RAW_STRING_MV_COLUMN*/)).build();
+              RAW_STRING_COLUMN, RAW_BYTES_COLUMN, RAW_INT_MV_COLUMN, RAW_LONG_MV_COLUMN, RAW_FLOAT_MV_COLUMN,
+              RAW_DOUBLE_MV_COLUMN, RAW_STRING_MV_COLUMN)).build();
 
   private IndexSegment _indexSegment;
   private List<IndexSegment> _indexSegments;
@@ -204,11 +202,11 @@ public class DistinctQueriesTest extends BaseQueriesTest {
       record.putValue(FLOAT_MV_COLUMN, mvValue);
       record.putValue(DOUBLE_MV_COLUMN, mvValue);
       record.putValue(STRING_MV_COLUMN, mvValue);
-//      record.putValue(RAW_INT_MV_COLUMN, mvValue);
-//      record.putValue(RAW_LONG_MV_COLUMN, mvValue);
-//      record.putValue(RAW_FLOAT_MV_COLUMN, mvValue);
-//      record.putValue(RAW_DOUBLE_MV_COLUMN, mvValue);
-//      record.putValue(RAW_STRING_MV_COLUMN, mvValue);
+      record.putValue(RAW_INT_MV_COLUMN, mvValue);
+      record.putValue(RAW_LONG_MV_COLUMN, mvValue);
+      record.putValue(RAW_FLOAT_MV_COLUMN, mvValue);
+      record.putValue(RAW_DOUBLE_MV_COLUMN, mvValue);
+      record.putValue(RAW_STRING_MV_COLUMN, mvValue);
       uniqueRecords.add(record);
     }
 
@@ -366,6 +364,55 @@ public class DistinctQueriesTest extends BaseQueriesTest {
         }
       }
     }
+    {
+      // Raw MV numeric columns
+      //@formatter:off
+      List<String> queries = Arrays.asList(
+          "SELECT DISTINCT(rawIntMVColumn) FROM testTable",
+          "SELECT DISTINCT(rawLongMVColumn) FROM testTable",
+          "SELECT DISTINCT(rawFloatMVColumn) FROM testTable",
+          "SELECT DISTINCT(rawDoubleMVColumn) FROM testTable"
+      );
+      //@formatter:on
+      // We define a specific result set here since the data read from raw is in the order added
+      Set<Integer> expectedValues = new HashSet<>(Arrays.asList(0, 1, 2, 3, 4, 100, 101, 102, 103, 104));
+      for (String query : queries) {
+        DistinctTable distinctTable1 = getDistinctTableInnerSegment(query);
+        DistinctTable distinctTable2 = DistinctTable.fromByteBuffer(ByteBuffer.wrap(distinctTable1.toBytes()));
+        for (DistinctTable distinctTable : Arrays.asList(distinctTable1, distinctTable2)) {
+          assertEquals(distinctTable.size(), 10);
+          Set<Integer> actualValues = new HashSet<>();
+          for (Record record : distinctTable.getRecords()) {
+            Object[] values = record.getValues();
+            assertEquals(values.length, 1);
+            assertTrue(values[0] instanceof Number);
+            actualValues.add(((Number) values[0]).intValue());
+          }
+          assertEquals(actualValues, expectedValues);
+        }
+      }
+    }
+    {
+      // Raw MV string column
+      //@formatter:off
+      String query = "SELECT DISTINCT(rawStringMVColumn) FROM testTable";
+      //@formatter:on
+      // We define a specific result set here since the data read from raw is in the order added
+      Set<Integer> expectedValues = new HashSet<>(Arrays.asList(0, 1, 2, 3, 4, 100, 101, 102, 103, 104));
+      DistinctTable distinctTable1 = getDistinctTableInnerSegment(query);
+      DistinctTable distinctTable2 = DistinctTable.fromByteBuffer(ByteBuffer.wrap(distinctTable1.toBytes()));
+      for (DistinctTable distinctTable : Arrays.asList(distinctTable1, distinctTable2)) {
+        assertEquals(distinctTable.size(), 10);
+        Set<Integer> actualValues = new HashSet<>();
+        for (Record record : distinctTable.getRecords()) {
+          Object[] values = record.getValues();
+          assertEquals(values.length, 1);
+          assertTrue(values[0] instanceof String);
+          actualValues.add(Integer.parseInt((String) values[0]));
+        }
+        assertEquals(actualValues, expectedValues);
+      }
+    }
   }
 
   @Test
@@ -562,6 +609,85 @@ public class DistinctQueriesTest extends BaseQueriesTest {
         assertEquals(actualValues, expectedValues);
       }
     }
+    {
+      // Numeric raw MV columns ASC
+      //@formatter:off
+      List<String> queries = Arrays.asList(
+          "SELECT DISTINCT(rawIntMVColumn) FROM testTable ORDER BY rawIntMVColumn",
+          "SELECT DISTINCT(rawLongMVColumn) FROM testTable ORDER BY rawLongMVColumn",
+          "SELECT DISTINCT(rawFloatMVColumn) FROM testTable ORDER BY rawFloatMVColumn",
+          "SELECT DISTINCT(rawDoubleMVColumn) FROM testTable ORDER BY rawDoubleMVColumn"
+      );
+      //@formatter:on
+      Set<Integer> expectedValues = new HashSet<>();
+      for (int i = 0; i < 10; i++) {
+        expectedValues.add(i);
+      }
+      for (String query : queries) {
+        DistinctTable distinctTable1 = getDistinctTableInnerSegment(query);
+        DistinctTable distinctTable2 = DistinctTable.fromByteBuffer(ByteBuffer.wrap(distinctTable1.toBytes()));
+        for (DistinctTable distinctTable : Arrays.asList(distinctTable1, distinctTable2)) {
+          assertEquals(distinctTable.size(), 10);
+          Set<Integer> actualValues = new HashSet<>();
+          for (Record record : distinctTable.getRecords()) {
+            Object[] values = record.getValues();
+            assertEquals(values.length, 1);
+            assertTrue(values[0] instanceof Number);
+            actualValues.add(((Number) values[0]).intValue());
+          }
+          assertEquals(actualValues, expectedValues);
+        }
+      }
+    }
+    {
+      // Numeric raw MV columns DESC
+      //@formatter:off
+      List<String> queries = Arrays.asList(
+          "SELECT DISTINCT(rawIntMVColumn) FROM testTable ORDER BY rawIntMVColumn DESC",
+          "SELECT DISTINCT(rawLongMVColumn) FROM testTable ORDER BY rawLongMVColumn DESC",
+          "SELECT DISTINCT(rawFloatMVColumn) FROM testTable ORDER BY rawFloatMVColumn DESC",
+          "SELECT DISTINCT(rawDoubleMVColumn) FROM testTable ORDER BY rawDoubleMVColumn DESC"
+      );
+      //@formatter:on
+      Set<Integer> expectedValues = new HashSet<>();
+      for (int i = 2 * NUM_UNIQUE_RECORDS_PER_SEGMENT - 10; i < 2 * NUM_UNIQUE_RECORDS_PER_SEGMENT; i++) {
+        expectedValues.add(i);
+      }
+      for (String query : queries) {
+        DistinctTable distinctTable1 = getDistinctTableInnerSegment(query);
+        DistinctTable distinctTable2 = DistinctTable.fromByteBuffer(ByteBuffer.wrap(distinctTable1.toBytes()));
+        for (DistinctTable distinctTable : Arrays.asList(distinctTable1, distinctTable2)) {
+          assertEquals(distinctTable.size(), 10);
+          Set<Integer> actualValues = new HashSet<>();
+          for (Record record : distinctTable.getRecords()) {
+            Object[] values = record.getValues();
+            assertEquals(values.length, 1);
+            assertTrue(values[0] instanceof Number);
+            actualValues.add(((Number) values[0]).intValue());
+          }
+          assertEquals(actualValues, expectedValues);
+        }
+      }
+    }
+    {
+      // String raw MV column
+      String query = "SELECT DISTINCT(rawStringMVColumn) FROM testTable ORDER BY rawStringMVColumn";
+      Set<String> expectedValues =
+          new HashSet<>(Arrays.asList("0", "1", "10", "100", "101", "102", "103", "104", "105", "106"));
+      DistinctTable distinctTable1 = getDistinctTableInnerSegment(query);
+      DistinctTable distinctTable2 = DistinctTable.fromByteBuffer(ByteBuffer.wrap(distinctTable1.toBytes()));
+      for (DistinctTable distinctTable : Arrays.asList(distinctTable1, distinctTable2)) {
+        assertEquals(distinctTable.size(), 10);
+        Set<String> actualValues = new HashSet<>();
+        for (Record record : distinctTable.getRecords()) {
+          Object[] values = record.getValues();
+          assertEquals(values.length, 1);
+          assertTrue(values[0] instanceof String);
+          actualValues.add((String) values[0]);
+        }
+        assertEquals(actualValues, expectedValues);
+      }
+    }
   }
 
   /**
@@ -576,10 +702,15 @@ public class DistinctQueriesTest extends BaseQueriesTest {
    *   <li>Selecting some columns order by raw BYTES column</li>
    *   <li>Selecting some columns transform, filter, order-by and limit</li>
    *   <li>Selecting some columns with filter that does not match any record</li>
+   *   <li>Selecting all dictionary-encoded raw MV columns</li>
+   *   <li>Selecting some SV columns (including raw) and some raw MV columns</li>
+   *   <li>Selecting some columns with filter with raw MV</li>
+   *   <li>Selecting some columns order by raw MV column</li>
+   *   <li>Selecting some columns with filter that does not match any record with raw MV</li>
    * </ul>
    */
   private void testDistinctInnerSegmentHelper(String[] queries) {
-    assertEquals(queries.length, 8);
+    assertEquals(queries.length, 13);
 
     // Selecting all dictionary-encoded SV columns
     // SELECT DISTINCT intColumn, longColumn, floatColumn, doubleColumn, bigDecimalColumn, stringColumn, bytesColumn
@@ -808,6 +939,148 @@ public class DistinctQueriesTest extends BaseQueriesTest {
       assertEquals(distinctTable.size(), 0);
       assertFalse(distinctTable.isMainTable());
     }
+
+    // Selecting all raw MV columns
+    // SELECT DISTINCT rawIntMVColumn, rawLongMVColumn, rawFloatMVColumn, rawDoubleMVColumn, rawStringMVColumn
+    // FROM testTable LIMIT 10000
+    {
+      DistinctTable distinctTable = getDistinctTableInnerSegment(queries[8]);
+
+      // Check data schema
+      DataSchema expectedDataSchema = new DataSchema(new String[]{
+          "rawIntMVColumn", "rawLongMVColumn", "rawFloatMVColumn", "rawDoubleMVColumn", "rawStringMVColumn"
+      }, new ColumnDataType[]{
+          ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.FLOAT, ColumnDataType.DOUBLE, ColumnDataType.STRING
+      });
+      assertEquals(distinctTable.getDataSchema(), expectedDataSchema);
+
+      // Check values, where all 100 * 2^5 unique combinations should be returned
+      int numUniqueCombinations = NUM_UNIQUE_RECORDS_PER_SEGMENT * (1 << 5);
+      assertEquals(distinctTable.size(), numUniqueCombinations);
+      assertTrue(distinctTable.isMainTable());
+      Set<List<Integer>> actualValues = new HashSet<>();
+      for (Record record : distinctTable.getRecords()) {
+        Object[] values = record.getValues();
+        int intValue = (Integer) values[0];
+        List<Integer> actualValueList =
+            Arrays.asList(intValue, ((Long) values[1]).intValue(), ((Float) values[2]).intValue(),
+                ((Double) values[3]).intValue(), Integer.parseInt((String) values[4]));
+        List<Integer> expectedValues = new ArrayList<>(2);
+        expectedValues.add(intValue % NUM_UNIQUE_RECORDS_PER_SEGMENT);
+        expectedValues.add(intValue % NUM_UNIQUE_RECORDS_PER_SEGMENT + NUM_UNIQUE_RECORDS_PER_SEGMENT);
+        for (Integer actualValue : actualValueList) {
+          assertTrue(expectedValues.contains(actualValue));
+        }
+        actualValues.add(actualValueList);
+      }
+      assertEquals(actualValues.size(), numUniqueCombinations);
+    }
+
+    // Selecting some SV columns (including raw) and some raw MV columns
+    // SELECT DISTINCT longColumn, rawBigDecimalColumn, rawFloatMVColumn, rawStringMVColumn FROM testTable LIMIT 10000
+    {
+      DistinctTable distinctTable = getDistinctTableInnerSegment(queries[9]);
+
+      // Check data schema
+      DataSchema expectedDataSchema = new DataSchema(new String[]{
+          "longColumn", "rawBigDecimalColumn", "rawFloatMVColumn", "rawStringMVColumn"
+      }, new ColumnDataType[]{
+          ColumnDataType.LONG, ColumnDataType.BIG_DECIMAL, ColumnDataType.FLOAT, ColumnDataType.STRING
+      });
+      assertEquals(distinctTable.getDataSchema(), expectedDataSchema);
+
+      // Check values, where all 100 * 2^2 unique combinations should be returned
+      int numUniqueCombinations = NUM_UNIQUE_RECORDS_PER_SEGMENT * (1 << 2);
+      assertEquals(distinctTable.size(), numUniqueCombinations);
+      assertTrue(distinctTable.isMainTable());
+      Set<List<Integer>> actualValues = new HashSet<>();
+      for (Record record : distinctTable.getRecords()) {
+        Object[] values = record.getValues();
+        int intValue = ((Long) values[0]).intValue();
+        List<Integer> actualValueList =
+            Arrays.asList(intValue, ((BigDecimal) values[1]).intValue(), ((Float) values[2]).intValue(),
+                Integer.parseInt((String) values[3]));
+        assertEquals((int) actualValueList.get(1), intValue);
+        List<Integer> expectedMVValues = new ArrayList<>(2);
+        expectedMVValues.add(intValue);
+        expectedMVValues.add(intValue + NUM_UNIQUE_RECORDS_PER_SEGMENT);
+        assertTrue(expectedMVValues.contains(actualValueList.get(2)));
+        assertTrue(expectedMVValues.contains(actualValueList.get(3)));
+        actualValues.add(actualValueList);
+      }
+      assertEquals(actualValues.size(), numUniqueCombinations);
+    }
+
+    // Selecting some columns with filter
+    // SELECT DISTINCT stringColumn, bytesColumn, rawIntMVColumn FROM testTable WHERE intColumn >= 60 LIMIT 10000
+    {
+      DistinctTable distinctTable = getDistinctTableInnerSegment(queries[10]);
+
+      // Check data schema
+      DataSchema expectedDataSchema = new DataSchema(new String[]{"stringColumn", "bytesColumn", "rawIntMVColumn"},
+          new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.BYTES, ColumnDataType.INT});
+      assertEquals(distinctTable.getDataSchema(), expectedDataSchema);
+
+      // Check values, where 40 * 2 matched combinations should be returned
+      int numMatchedCombinations = (NUM_UNIQUE_RECORDS_PER_SEGMENT - 60) * 2;
+      assertEquals(distinctTable.size(), numMatchedCombinations);
+      assertTrue(distinctTable.isMainTable());
+      Set<List<Integer>> actualValues = new HashSet<>();
+      for (Record record : distinctTable.getRecords()) {
+        Object[] values = record.getValues();
+        int intValue = Integer.parseInt((String) values[0]);
+        assertTrue(intValue >= 60);
+        List<Integer> actualValueList =
+            Arrays.asList(intValue, Integer.parseInt(new String(((ByteArray) values[1]).getBytes(), UTF_8).trim()),
+                (Integer) values[2]);
+        assertEquals((int) actualValueList.get(1), intValue);
+        assertTrue((Integer) values[2] == intValue || (Integer) values[2] == intValue + NUM_UNIQUE_RECORDS_PER_SEGMENT);
+        actualValues.add(actualValueList);
+      }
+      assertEquals(actualValues.size(), numMatchedCombinations);
+    }
+
+    // Selecting some columns order by raw MV column
+    // SELECT DISTINCT floatColumn, rawDoubleMVColumn FROM testTable ORDER BY rawDoubleMVColumn DESC
+    {
+      DistinctTable distinctTable = getDistinctTableInnerSegment(queries[11]);
+
+      // Check data schema
+      DataSchema expectedDataSchema = new DataSchema(new String[]{"floatColumn", "rawDoubleMVColumn"},
+          new ColumnDataType[]{ColumnDataType.FLOAT, ColumnDataType.DOUBLE});
+      assertEquals(distinctTable.getDataSchema(), expectedDataSchema);
+
+      // Check values, where only 10 top values should be returned
+      assertEquals(distinctTable.size(), 10);
+      assertTrue(distinctTable.isMainTable());
+      Set<Integer> expectedValues = new HashSet<>();
+      for (int i = 0; i < 10; i++) {
+        expectedValues.add(NUM_UNIQUE_RECORDS_PER_SEGMENT * 2 - i - 1);
+      }
+      Set<Integer> actualValues = new HashSet<>();
+      for (Record record : distinctTable.getRecords()) {
+        Object[] values = record.getValues();
+        int actualValue = ((Double) values[1]).intValue();
+        assertEquals(((Float) values[0]).intValue(), actualValue - NUM_UNIQUE_RECORDS_PER_SEGMENT);
+        actualValues.add(actualValue);
+      }
+      assertEquals(actualValues, expectedValues);
+    }
+
+    // Selecting some columns with filter that does not match any record
+    // SELECT DISTINCT floatColumn, rawLongMVColumn FROM testTable WHERE stringColumn = 'a' ORDER BY rawLongMVColumn
+    {
+      DistinctTable distinctTable = getDistinctTableInnerSegment(queries[12]);
+
+      // Check data schema
+      DataSchema expectedDataSchema = new DataSchema(new String[]{"floatColumn", "rawLongMVColumn"},
+          new ColumnDataType[]{ColumnDataType.FLOAT, ColumnDataType.LONG});
+      assertEquals(distinctTable.getDataSchema(), expectedDataSchema);
+
+      // Check values, where no record should be returned
+      assertEquals(distinctTable.size(), 0);
+      assertTrue(distinctTable.isMainTable());
+    }
   }
 
   /**
@@ -822,6 +1095,11 @@ public class DistinctQueriesTest extends BaseQueriesTest {
    *   <li>Selecting some columns order by raw BYTES column</li>
    *   <li>Selecting some columns transform, filter, order-by and limit</li>
    *   <li>Selecting some columns with filter that does not match any record</li>
+   *   <li>Selecting all dictionary-encoded raw MV columns</li>
+   *   <li>Selecting some SV columns (including raw) and some raw MV columns</li>
+   *   <li>Selecting some columns with filter with raw MV</li>
+   *   <li>Selecting some columns order by raw MV column</li>
+   *   <li>Selecting some columns with filter that does not match any record with raw MV</li>
    * </ul>
    */
   @Test
@@ -838,7 +1116,14 @@ public class DistinctQueriesTest extends BaseQueriesTest {
         "SELECT DISTINCT intColumn, rawBytesColumn FROM testTable ORDER BY rawBytesColumn LIMIT 5",
         "SELECT DISTINCT ADD(intColumn, floatColumn), stringColumn FROM testTable WHERE longColumn < 60 "
             + "ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10",
-        "SELECT DISTINCT floatColumn, longMVColumn FROM testTable WHERE stringColumn = 'a' ORDER BY longMVColumn"
+        "SELECT DISTINCT floatColumn, longMVColumn FROM testTable WHERE stringColumn = 'a' ORDER BY longMVColumn",
+        "SELECT DISTINCT rawIntMVColumn, rawLongMVColumn, rawFloatMVColumn, rawDoubleMVColumn, rawStringMVColumn "
+            + "FROM testTable LIMIT 10000",
+        "SELECT DISTINCT longColumn, rawBigDecimalColumn, rawFloatMVColumn, rawStringMVColumn FROM testTable "
+            + "LIMIT 10000",
+        "SELECT DISTINCT stringColumn, bytesColumn, rawIntMVColumn FROM testTable WHERE rawIntColumn >= 60 LIMIT 10000",
+        "SELECT DISTINCT floatColumn, rawDoubleMVColumn FROM testTable ORDER BY rawDoubleMVColumn DESC",
+        "SELECT DISTINCT floatColumn, rawLongMVColumn FROM testTable WHERE stringColumn = 'a' ORDER BY rawLongMVColumn"
     });
     //@formatter:on
   }
@@ -855,6 +1140,11 @@ public class DistinctQueriesTest extends BaseQueriesTest {
    *   <li>Selecting some columns order by raw BYTES column</li>
    *   <li>Selecting some columns transform, filter, order-by and limit</li>
    *   <li>Selecting some columns with filter that does not match any record</li>
+   *   <li>Selecting all dictionary-encoded raw MV columns</li>
+   *   <li>Selecting some SV columns (including raw) and some raw MV columns</li>
+   *   <li>Selecting some columns with filter with raw MV</li>
+   *   <li>Selecting some columns order by raw MV column</li>
+   *   <li>Selecting some columns with filter that does not match any record with raw MV</li>
    * </ul>
    */
   @Test
@@ -879,7 +1169,18 @@ public class DistinctQueriesTest extends BaseQueriesTest {
             + "GROUP BY ADD(intColumn, floatColumn), stringColumn "
             + "ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10",
         "SELECT floatColumn, longMVColumn FROM testTable WHERE stringColumn = 'a' "
-            + "GROUP BY floatColumn, longMVColumn ORDER BY longMVColumn"
+            + "GROUP BY floatColumn, longMVColumn ORDER BY longMVColumn",
+        "SELECT rawIntMVColumn, rawLongMVColumn, rawFloatMVColumn, rawDoubleMVColumn, rawStringMVColumn "
+            + "FROM testTable GROUP BY rawIntMVColumn, rawLongMVColumn, rawFloatMVColumn, rawDoubleMVColumn, "
+            + "rawStringMVColumn LIMIT 10000",
+        "SELECT longColumn, rawBigDecimalColumn, rawFloatMVColumn, rawStringMVColumn FROM testTable "
+            + "GROUP BY longColumn, rawBigDecimalColumn, rawFloatMVColumn, rawStringMVColumn LIMIT 10000",
+        "SELECT stringColumn, bytesColumn, rawIntMVColumn FROM testTable WHERE rawIntColumn >= 60 "
+            + "GROUP BY stringColumn, bytesColumn, rawIntMVColumn LIMIT 10000",
+        "SELECT floatColumn, rawDoubleMVColumn FROM testTable GROUP BY floatColumn, rawDoubleMVColumn "
+            + "ORDER BY rawDoubleMVColumn DESC",
+        "SELECT floatColumn, rawLongMVColumn FROM testTable WHERE stringColumn = 'a' GROUP BY floatColumn, "
+            + "rawLongMVColumn ORDER BY rawLongMVColumn"
     });
     //@formatter:on
   }
@@ -912,11 +1213,16 @@ public class DistinctQueriesTest extends BaseQueriesTest {
    *     Selecting some columns with filter that does not match any record in one segment but matches some records in
    *     the other segment
    *   </li>
+   *   <li>Selecting all dictionary-encoded raw MV columns</li>
+   *   <li>Selecting some SV columns (including raw) and some raw MV columns</li>
+   *   <li>Selecting some columns with filter with raw MV</li>
+   *   <li>Selecting some columns order by raw MV column</li>
+   *   <li>Selecting some columns with filter that does not match any record with raw MV</li>
    *   TODO: Support alias and add a test for that
    * </ul>
    */
   private void testDistinctInterSegmentHelper(String[] queries) {
-    assertEquals(queries.length, 9);
+    assertEquals(queries.length, 14);
 
     // Selecting all dictionary-encoded SV columns
     // SELECT DISTINCT intColumn, longColumn, floatColumn, doubleColumn, bigDecimalColumn, stringColumn, bytesColumn
@@ -1157,6 +1463,142 @@ public class DistinctQueriesTest extends BaseQueriesTest {
         assertEquals((int) rows.get(i)[0], expectedValues[i]);
       }
     }
+
+    // Selecting all dictionary-encoded raw MV columns
+    // SELECT DISTINCT rawIntMVColumn, rawLongMVColumn, rawFloatMVColumn, rawDoubleMVColumn, rawStringMVColumn
+    // FROM testTable LIMIT 10000
+    {
+      ResultTable resultTable = getBrokerResponse(queries[9]).getResultTable();
+
+      // Check data schema
+      DataSchema expectedDataSchema = new DataSchema(new String[]{
+          "rawIntMVColumn", "rawLongMVColumn", "rawFloatMVColumn", "rawDoubleMVColumn", "rawStringMVColumn"
+      }, new ColumnDataType[]{
+          ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.FLOAT, ColumnDataType.DOUBLE, ColumnDataType.STRING
+      });
+      assertEquals(resultTable.getDataSchema(), expectedDataSchema);
+
+      // Check values, where all 200 * 2^5 unique values should be returned
+      int numUniqueCombinations = 2 * NUM_UNIQUE_RECORDS_PER_SEGMENT * (1 << 5);
+      List<Object[]> rows = resultTable.getRows();
+      assertEquals(rows.size(), numUniqueCombinations);
+      Set<List<Integer>> actualValues = new HashSet<>();
+      for (Object[] row : rows) {
+        int intValue = (Integer) row[0];
+        List<Integer> actualValueList = Arrays.asList(intValue, ((Long) row[1]).intValue(), ((Float) row[2]).intValue(),
+            ((Double) row[3]).intValue(), Integer.parseInt((String) row[4]));
+        List<Integer> expectedValues = new ArrayList<>(2);
+        if (intValue < 1000) {
+          expectedValues.add(intValue % NUM_UNIQUE_RECORDS_PER_SEGMENT);
+          expectedValues.add(intValue % NUM_UNIQUE_RECORDS_PER_SEGMENT + NUM_UNIQUE_RECORDS_PER_SEGMENT);
+        } else {
+          expectedValues.add(intValue % NUM_UNIQUE_RECORDS_PER_SEGMENT + 1000);
+          expectedValues.add(intValue % NUM_UNIQUE_RECORDS_PER_SEGMENT + NUM_UNIQUE_RECORDS_PER_SEGMENT + 1000);
+        }
+        for (Integer actualValue : actualValueList) {
+          assertTrue(expectedValues.contains(actualValue));
+        }
+        actualValues.add(actualValueList);
+      }
+      assertEquals(actualValues.size(), numUniqueCombinations);
+    }
+
+    // Selecting some SV columns (including raw) and some raw MV columns
+    // SELECT DISTINCT longColumn, rawBigDecimalColumn, rawFloatMVColumn, rawStringMVColumn FROM testTable LIMIT 10000
+    {
+      ResultTable resultTable = getBrokerResponse(queries[10]).getResultTable();
+
+      // Check data schema
+      DataSchema expectedDataSchema = new DataSchema(new String[]{
+          "longColumn", "rawBigDecimalColumn", "rawFloatMVColumn", "rawStringMVColumn"
+      }, new ColumnDataType[]{
+          ColumnDataType.LONG, ColumnDataType.BIG_DECIMAL, ColumnDataType.FLOAT, ColumnDataType.STRING
+      });
+      assertEquals(resultTable.getDataSchema(), expectedDataSchema);
+
+      // Check values, where all 200 * 2^2 unique values should be returned
+      int numUniqueCombinations = 2 * NUM_UNIQUE_RECORDS_PER_SEGMENT * (1 << 2);
+      List<Object[]> rows = resultTable.getRows();
+      assertEquals(rows.size(), numUniqueCombinations);
+      Set<List<Integer>> actualValues = new HashSet<>();
+      for (Object[] row : rows) {
+        int intValue = ((Long) row[0]).intValue();
+        List<Integer> actualValueList =
+            Arrays.asList(intValue, ((BigDecimal) row[1]).intValue(), ((Float) row[2]).intValue(),
+                Integer.parseInt((String) row[3]));
+        assertEquals((int) actualValueList.get(1), intValue);
+        List<Integer> expectedMVValues = new ArrayList<>(2);
+        expectedMVValues.add(intValue);
+        expectedMVValues.add(intValue + NUM_UNIQUE_RECORDS_PER_SEGMENT);
+        assertTrue(expectedMVValues.contains(actualValueList.get(2)));
+        assertTrue(expectedMVValues.contains(actualValueList.get(3)));
+        actualValues.add(actualValueList);
+      }
+      assertEquals(actualValues.size(), numUniqueCombinations);
+    }
+
+    // Selecting some columns with filter
+    // SELECT DISTINCT stringColumn, bytesColumn, rawIntMVColumn FROM testTable WHERE intColumn >= 60 LIMIT 10000
+    {
+      ResultTable resultTable = getBrokerResponse(queries[11]).getResultTable();
+
+      // Check data schema
+      DataSchema expectedDataSchema = new DataSchema(new String[]{"stringColumn", "bytesColumn", "rawIntMVColumn"},
+          new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.BYTES, ColumnDataType.INT});
+      assertEquals(resultTable.getDataSchema(), expectedDataSchema);
+
+      // Check values, where 140 * 2 matched values should be returned
+      int numMatchedCombinations = (2 * NUM_UNIQUE_RECORDS_PER_SEGMENT - 60) * 2;
+      List<Object[]> rows = resultTable.getRows();
+      assertEquals(rows.size(), numMatchedCombinations);
+      Set<List<Integer>> actualValues = new HashSet<>();
+      for (Object[] row : rows) {
+        int intValue = Integer.parseInt((String) row[0]);
+        assertTrue(intValue >= 60);
+        List<Integer> actualValueList =
+            Arrays.asList(intValue, Integer.parseInt(new String(BytesUtils.toBytes((String) row[1]), UTF_8).trim()),
+                (Integer) row[2]);
+        assertEquals((int) actualValueList.get(1), intValue);
+        assertTrue((Integer) row[2] == intValue || (Integer) row[2] == intValue + NUM_UNIQUE_RECORDS_PER_SEGMENT);
+        actualValues.add(actualValueList);
+      }
+      assertEquals(actualValues.size(), numMatchedCombinations);
+    }
+
+    // Selecting some columns order by raw MV column
+    // SELECT DISTINCT floatColumn, rawDoubleMVColumn FROM testTable ORDER BY rawDoubleMVColumn DESC
+    {
+      ResultTable resultTable = getBrokerResponse(queries[12]).getResultTable();
+
+      // Check data schema
+      DataSchema expectedDataSchema = new DataSchema(new String[]{"floatColumn", "rawDoubleMVColumn"},
+          new ColumnDataType[]{ColumnDataType.FLOAT, ColumnDataType.DOUBLE});
+      assertEquals(resultTable.getDataSchema(), expectedDataSchema);
+
+      // Check values, where only 10 top values should be returned
+      List<Object[]> rows = resultTable.getRows();
+      assertEquals(rows.size(), 10);
+      for (int i = 0; i < 10; i++) {
+        int expectedValue = NUM_UNIQUE_RECORDS_PER_SEGMENT * 2 + 1000 - i - 1;
+        Object[] row = rows.get(i);
+        assertEquals(((Float) row[0]).intValue(), expectedValue - NUM_UNIQUE_RECORDS_PER_SEGMENT);
+        assertEquals(((Double) row[1]).intValue(), expectedValue);
+      }
+    }
+
+    // Selecting some columns with filter that does not match any record
+    // SELECT DISTINCT floatColumn, rawLongMVColumn FROM testTable WHERE stringColumn = 'a' ORDER BY rawLongMVColumn
+    {
+      ResultTable resultTable = getBrokerResponse(queries[13]).getResultTable();
+
+      // Check data schema, where data type should be STRING for all columns
+      DataSchema expectedDataSchema = new DataSchema(new String[]{"floatColumn", "rawLongMVColumn"},
+          new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.STRING});
+      assertEquals(resultTable.getDataSchema(), expectedDataSchema);
+
+      // Check values, where no record should be returned
+      assertTrue(resultTable.getRows().isEmpty());
+    }
   }
 
   /**
@@ -1175,6 +1617,11 @@ public class DistinctQueriesTest extends BaseQueriesTest {
    *     Selecting some columns with filter that does not match any record in one segment but matches some records in
    *     the other segment
    *   </li>
+   *   <li>Selecting all dictionary-encoded raw MV columns</li>
+   *   <li>Selecting some SV columns (including raw) and some raw MV columns</li>
+   *   <li>Selecting some columns with filter with raw MV</li>
+   *   <li>Selecting some columns order by raw MV column</li>
+   *   <li>Selecting some columns with filter that does not match any record with raw MV</li>
    *   TODO: Support alias and add a test for that
    * </ul>
    */
@@ -1193,7 +1640,14 @@ public class DistinctQueriesTest extends BaseQueriesTest {
         "SELECT DISTINCT ADD(intColumn, floatColumn), stringColumn FROM testTable WHERE longColumn < 60 "
             + "ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10",
         "SELECT DISTINCT floatColumn, longMVColumn FROM testTable WHERE stringColumn = 'a' ORDER BY longMVColumn",
-        "SELECT DISTINCT intColumn FROM testTable WHERE floatColumn > 200 ORDER BY intColumn ASC LIMIT 5"
+        "SELECT DISTINCT intColumn FROM testTable WHERE floatColumn > 200 ORDER BY intColumn ASC LIMIT 5",
+        "SELECT DISTINCT rawIntMVColumn, rawLongMVColumn, rawFloatMVColumn, rawDoubleMVColumn, rawStringMVColumn "
+            + "FROM testTable LIMIT 10000",
+        "SELECT DISTINCT longColumn, rawBigDecimalColumn, rawFloatMVColumn, rawStringMVColumn FROM testTable "
+            + "LIMIT 10000",
+        "SELECT DISTINCT stringColumn, bytesColumn, rawIntMVColumn FROM testTable WHERE intColumn >= 60 LIMIT 10000",
+        "SELECT DISTINCT floatColumn, rawDoubleMVColumn FROM testTable ORDER BY rawDoubleMVColumn DESC",
+        "SELECT DISTINCT floatColumn, rawLongMVColumn FROM testTable WHERE stringColumn = 'a' ORDER BY rawLongMVColumn"
     });
     //@formatter:on
   }
@@ -1215,6 +1669,11 @@ public class DistinctQueriesTest extends BaseQueriesTest {
    *     Selecting some columns with filter that does not match any record in one segment but matches some records in
    *     the other segment
    *   </li>
+   *   <li>Selecting all dictionary-encoded raw MV columns</li>
+   *   <li>Selecting some SV columns (including raw) and some raw MV columns</li>
+   *   <li>Selecting some columns with filter with raw MV</li>
+   *   <li>Selecting some columns order by raw MV column</li>
+   *   <li>Selecting some columns with filter that does not match any record with raw MV</li>
    *   TODO: Support alias and add a test for that
    * </ul>
    */
@@ -1241,7 +1700,18 @@ public class DistinctQueriesTest extends BaseQueriesTest {
             + "ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10",
         "SELECT floatColumn, longMVColumn FROM testTable WHERE stringColumn = 'a' "
             + "GROUP BY floatColumn, longMVColumn ORDER BY longMVColumn",
-        "SELECT intColumn FROM testTable WHERE floatColumn > 200 GROUP BY intColumn ORDER BY intColumn ASC LIMIT 5"
+        "SELECT intColumn FROM testTable WHERE floatColumn > 200 GROUP BY intColumn ORDER BY intColumn ASC LIMIT 5",
+        "SELECT rawIntMVColumn, rawLongMVColumn, rawFloatMVColumn, rawDoubleMVColumn, rawStringMVColumn FROM testTable "
+            + "GROUP BY rawIntMVColumn, rawLongMVColumn, rawFloatMVColumn, rawDoubleMVColumn, rawStringMVColumn "
+            + "LIMIT 10000",
+        "SELECT longColumn, rawBigDecimalColumn, rawFloatMVColumn, rawStringMVColumn FROM testTable "
+            + "GROUP BY longColumn, rawBigDecimalColumn, rawFloatMVColumn, rawStringMVColumn LIMIT 10000",
+        "SELECT stringColumn, bytesColumn, rawIntMVColumn FROM testTable WHERE intColumn >= 60 GROUP BY "
+            + "stringColumn, bytesColumn, rawIntMVColumn LIMIT 10000",
+        "SELECT floatColumn, rawDoubleMVColumn FROM testTable GROUP BY floatColumn, rawDoubleMVColumn "
+            + "ORDER BY rawDoubleMVColumn DESC",
+        "SELECT floatColumn, rawLongMVColumn FROM testTable WHERE stringColumn = 'a' GROUP BY floatColumn, "
+            + "rawLongMVColumn ORDER BY rawLongMVColumn"
     });
     //@formatter:on
   }
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/MultiValueRawQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/MultiValueRawQueriesTest.java
new file mode 100644
index 0000000000..b026717400
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/MultiValueRawQueriesTest.java
@@ -0,0 +1,810 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+// TODO: Add tests for more query patterns when additional fixes for MV raw columns are made
+public class MultiValueRawQueriesTest extends BaseQueriesTest {
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "MultiValueRawQueriesTest");
+
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME_1 = "testSegment1";
+  private static final String SEGMENT_NAME_2 = "testSegment2";
+
+  private static final int NUM_UNIQUE_RECORDS_PER_SEGMENT = 10;
+  private static final int NUM_DUPLICATES_PER_RECORDS = 2;
+  private static final int MV_OFFSET = 100;
+  private static final int BASE_VALUE_1 = 0;
+  private static final int BASE_VALUE_2 = 1000;
+
+  private final static String SV_INT_COL = "svIntCol";
+  private final static String MV_INT_COL = "mvIntCol";
+  private final static String MV_LONG_COL = "mvLongCol";
+  private final static String MV_FLOAT_COL = "mvFloatCol";
+  private final static String MV_DOUBLE_COL = "mvDoubleCol";
+  private final static String MV_STRING_COL = "mvStringCol";
+  private final static String MV_RAW_INT_COL = "mvRawIntCol";
+  private final static String MV_RAW_LONG_COL = "mvRawLongCol";
+  private final static String MV_RAW_FLOAT_COL = "mvRawFloatCol";
+  private final static String MV_RAW_DOUBLE_COL = "mvRawDoubleCol";
+  private final static String MV_RAW_STRING_COL = "mvRawStringCol";
+
+  private static final Schema SCHEMA = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
+      .addSingleValueDimension(SV_INT_COL, FieldSpec.DataType.INT)
+      .addMultiValueDimension(MV_INT_COL, FieldSpec.DataType.INT)
+      .addMultiValueDimension(MV_LONG_COL, FieldSpec.DataType.LONG)
+      .addMultiValueDimension(MV_FLOAT_COL, FieldSpec.DataType.FLOAT)
+      .addMultiValueDimension(MV_DOUBLE_COL, FieldSpec.DataType.DOUBLE)
+      .addMultiValueDimension(MV_STRING_COL, FieldSpec.DataType.STRING)
+      .addMultiValueDimension(MV_RAW_INT_COL, FieldSpec.DataType.INT)
+      .addMultiValueDimension(MV_RAW_LONG_COL, FieldSpec.DataType.LONG)
+      .addMultiValueDimension(MV_RAW_FLOAT_COL, FieldSpec.DataType.FLOAT)
+      .addMultiValueDimension(MV_RAW_DOUBLE_COL, FieldSpec.DataType.DOUBLE)
+      .addMultiValueDimension(MV_RAW_STRING_COL, FieldSpec.DataType.STRING)
+      .build();
+
+  private static final DataSchema DATA_SCHEMA = new DataSchema(new String[]{"mvDoubleCol", "mvFloatCol", "mvIntCol",
+      "mvLongCol", "mvRawDoubleCol", "mvRawFloatCol", "mvRawIntCol", "mvRawLongCol", "mvRawStringCol", "mvStringCol",
+      "svIntCol"},
+      new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE_ARRAY, DataSchema.ColumnDataType.FLOAT_ARRAY,
+          DataSchema.ColumnDataType.INT_ARRAY, DataSchema.ColumnDataType.LONG_ARRAY,
+          DataSchema.ColumnDataType.DOUBLE_ARRAY, DataSchema.ColumnDataType.FLOAT_ARRAY,
+          DataSchema.ColumnDataType.INT_ARRAY, DataSchema.ColumnDataType.LONG_ARRAY,
+          DataSchema.ColumnDataType.STRING_ARRAY, DataSchema.ColumnDataType.STRING_ARRAY,
+          DataSchema.ColumnDataType.INT});
+
+  private static final TableConfig TABLE = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+      .setNoDictionaryColumns(
+          Arrays.asList(MV_RAW_INT_COL, MV_RAW_LONG_COL, MV_RAW_FLOAT_COL, MV_RAW_DOUBLE_COL, MV_RAW_STRING_COL))
+      .build();
+
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
+
+  @Override
+  protected String getFilter() {
+    return "";
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteQuietly(INDEX_DIR);
+
+    ImmutableSegment segment1 = createSegment(generateRecords(BASE_VALUE_1), SEGMENT_NAME_1);
+    ImmutableSegment segment2 = createSegment(generateRecords(BASE_VALUE_2), SEGMENT_NAME_2);
+    _indexSegment = segment1;
+    _indexSegments = Arrays.asList(segment1, segment2);
+  }
+
+  @AfterClass
+  public void tearDown() {
+    for (IndexSegment indexSegment : _indexSegments) {
+      indexSegment.destroy();
+    }
+
+    FileUtils.deleteQuietly(INDEX_DIR);
+  }
+
+  /**
+   * Helper method to generate records based on the given base value.
+   *
+   * All columns will have the same value but different data types (BYTES values are encoded STRING values).
+   * For the {i}th unique record, the value will be {baseValue + i}.
+   */
+  private List<GenericRow> generateRecords(int baseValue) {
+    List<GenericRow> uniqueRecords = new ArrayList<>(NUM_UNIQUE_RECORDS_PER_SEGMENT);
+    for (int i = 0; i < NUM_UNIQUE_RECORDS_PER_SEGMENT; i++) {
+      int value = baseValue + i;
+      GenericRow record = new GenericRow();
+      record.putValue(SV_INT_COL, value);
+      Integer[] mvValue = new Integer[]{value, value + MV_OFFSET};
+      record.putValue(MV_INT_COL, mvValue);
+      record.putValue(MV_LONG_COL, mvValue);
+      record.putValue(MV_FLOAT_COL, mvValue);
+      record.putValue(MV_DOUBLE_COL, mvValue);
+      record.putValue(MV_STRING_COL, mvValue);
+      record.putValue(MV_RAW_INT_COL, mvValue);
+      record.putValue(MV_RAW_LONG_COL, mvValue);
+      record.putValue(MV_RAW_FLOAT_COL, mvValue);
+      record.putValue(MV_RAW_DOUBLE_COL, mvValue);
+      record.putValue(MV_RAW_STRING_COL, mvValue);
+      uniqueRecords.add(record);
+    }
+
+    List<GenericRow> records = new ArrayList<>(NUM_UNIQUE_RECORDS_PER_SEGMENT * NUM_DUPLICATES_PER_RECORDS);
+    for (int i = 0; i < NUM_DUPLICATES_PER_RECORDS; i++) {
+      records.addAll(uniqueRecords);
+    }
+    return records;
+  }
+
+  private ImmutableSegment createSegment(List<GenericRow> records, String segmentName)
+      throws Exception {
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE, SCHEMA);
+    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+    segmentGeneratorConfig.setSegmentName(segmentName);
+    segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
+
+    SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
+    driver.build();
+
+    return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName), ReadMode.mmap);
+  }
+
+  @Test
+  public void testSelectQueries() {
+    {
+      // Select * query
+      String query = "SELECT * from testTable ORDER BY svIntCol LIMIT 40";
+      ResultTable resultTable = getBrokerResponse(query).getResultTable();
+      assertNotNull(resultTable);
+      assertEquals(resultTable.getDataSchema(), DATA_SCHEMA);
+      List<Object[]> recordRows = resultTable.getRows();
+      assertEquals(recordRows.size(), 40);
+
+      Set<Integer> expectedValuesFirst = new HashSet<>();
+      Set<Integer> expectedValuesSecond = new HashSet<>();
+      for (int i = 0; i < NUM_UNIQUE_RECORDS_PER_SEGMENT; i++) {
+        expectedValuesFirst.add(i);
+        expectedValuesSecond.add(i + MV_OFFSET);
+      }
+
+      Set<Integer> actualValuesFirst = new HashSet<>();
+      Set<Integer> actualValuesSecond = new HashSet<>();
+      for (int i = 0; i < 40; i++) {
+        Object[] values = recordRows.get(i);
+        assertEquals(values.length, 11);
+        int svIntValue = (int) values[10];
+        int[] intValues = (int[]) values[2];
+        assertEquals(intValues[1] - intValues[0], MV_OFFSET);
+        assertEquals(svIntValue, intValues[0]);
+
+        int[] intValuesRaw = (int[]) values[6];
+        assertEquals(intValues[0], intValuesRaw[0]);
+        assertEquals(intValues[1], intValuesRaw[1]);
+
+        long[] longValues = (long[]) values[3];
+        long[] longValuesRaw = (long[]) values[7];
+        assertEquals(longValues[0], intValues[0]);
+        assertEquals(longValues[1], intValues[1]);
+        assertEquals(longValues[0], longValuesRaw[0]);
+        assertEquals(longValues[1], longValuesRaw[1]);
+
+        float[] floatValues = (float[]) values[1];
+        float[] floatValuesRaw = (float[]) values[5];
+        assertEquals(floatValues[0], (float) intValues[0]);
+        assertEquals(floatValues[1], (float) intValues[1]);
+        assertEquals(floatValues[0], floatValuesRaw[0]);
+        assertEquals(floatValues[1], floatValuesRaw[1]);
+
+        double[] doubleValues = (double[]) values[0];
+        double[] doubleValuesRaw = (double[]) values[4];
+        assertEquals(doubleValues[0], (double) intValues[0]);
+        assertEquals(doubleValues[1], (double) intValues[1]);
+        assertEquals(doubleValues[0], doubleValuesRaw[0]);
+        assertEquals(doubleValues[1], doubleValuesRaw[1]);
+
+        String[] stringValues = (String[]) values[8];
+        String[] stringValuesRaw = (String[]) values[9];
+        assertEquals(Integer.parseInt(stringValues[0]), intValues[0]);
+        assertEquals(Integer.parseInt(stringValues[1]), intValues[1]);
+        assertEquals(stringValues[0], stringValuesRaw[0]);
+        assertEquals(stringValues[1], stringValuesRaw[1]);
+
+        actualValuesFirst.add(intValues[0]);
+        actualValuesSecond.add(intValues[1]);
+      }
+      assertTrue(actualValuesFirst.containsAll(expectedValuesFirst));
+      assertTrue(actualValuesSecond.containsAll(expectedValuesSecond));
+    }
+    {
+      // Select some dict based MV and some raw MV columns. Validate that the values match for the corresponding rows
+      String query = "SELECT mvIntCol, mvDoubleCol, mvStringCol, mvRawIntCol, mvRawDoubleCol, mvRawStringCol, svIntCol "
+          + "from testTable ORDER BY svIntCol LIMIT 40";
+      ResultTable resultTable = getBrokerResponse(query).getResultTable();
+      assertNotNull(resultTable);
+      DataSchema dataSchema = new DataSchema(new String[]{
+          "mvIntCol", "mvDoubleCol", "mvStringCol", "mvRawIntCol", "mvRawDoubleCol", "mvRawStringCol", "svIntCol"
+      }, new DataSchema.ColumnDataType[]{
+          DataSchema.ColumnDataType.INT_ARRAY, DataSchema.ColumnDataType.DOUBLE_ARRAY,
+          DataSchema.ColumnDataType.STRING_ARRAY, DataSchema.ColumnDataType.INT_ARRAY,
+          DataSchema.ColumnDataType.DOUBLE_ARRAY, DataSchema.ColumnDataType.STRING_ARRAY,
+          DataSchema.ColumnDataType.INT
+      });
+      assertEquals(resultTable.getDataSchema(), dataSchema);
+      List<Object[]> recordRows = resultTable.getRows();
+      assertEquals(recordRows.size(), 40);
+
+      Set<Integer> expectedValuesFirst = new HashSet<>();
+      Set<Integer> expectedValuesSecond = new HashSet<>();
+      for (int i = 0; i < NUM_UNIQUE_RECORDS_PER_SEGMENT; i++) {
+        expectedValuesFirst.add(i);
+        expectedValuesSecond.add(i + MV_OFFSET);
+      }
+
+      Set<Integer> actualValuesFirst = new HashSet<>();
+      Set<Integer> actualValuesSecond = new HashSet<>();
+      for (int i = 0; i < 40; i++) {
+        Object[] values = recordRows.get(i);
+        assertEquals(values.length, 7);
+        int[] intValues = (int[]) values[0];
+        assertEquals(intValues[1] - intValues[0], MV_OFFSET);
+
+        int[] intValuesRaw = (int[]) values[3];
+        assertEquals(intValues[0], intValuesRaw[0]);
+        assertEquals(intValues[1], intValuesRaw[1]);
+
+        double[] doubleValues = (double[]) values[1];
+        double[] doubleValuesRaw = (double[]) values[4];
+        assertEquals(doubleValues[0], (double) intValues[0]);
+        assertEquals(doubleValues[1], (double) intValues[1]);
+        assertEquals(doubleValues[0], doubleValuesRaw[0]);
+        assertEquals(doubleValues[1], doubleValuesRaw[1]);
+
+        String[] stringValues = (String[]) values[2];
+        String[] stringValuesRaw = (String[]) values[5];
+        assertEquals(Integer.parseInt(stringValues[0]), intValues[0]);
+        assertEquals(Integer.parseInt(stringValues[1]), intValues[1]);
+        assertEquals(stringValues[0], stringValuesRaw[0]);
+        assertEquals(stringValues[1], stringValuesRaw[1]);
+
+        assertEquals(intValues[0], (int) values[6]);
+        assertEquals(intValuesRaw[0], (int) values[6]);
+
+        actualValuesFirst.add(intValues[0]);
+        actualValuesSecond.add(intValues[1]);
+      }
+      assertTrue(actualValuesFirst.containsAll(expectedValuesFirst));
+      assertTrue(actualValuesSecond.containsAll(expectedValuesSecond));
+    }
+    {
+      // Test a select with a ARRAYLENGTH transform function
+      String query = "SELECT ARRAYLENGTH(mvRawLongCol), ARRAYLENGTH(mvLongCol) from testTable LIMIT 10";
+      ResultTable resultTable = getBrokerResponse(query).getResultTable();
+      assertNotNull(resultTable);
+      DataSchema dataSchema = new DataSchema(new String[]{"arraylength(mvRawLongCol)", "arraylength(mvLongCol)"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT});
+      assertEquals(resultTable.getDataSchema(), dataSchema);
+      List<Object[]> recordRows = resultTable.getRows();
+      assertEquals(recordRows.size(), 10);
+
+      for (int i = 0; i < 10; i++) {
+        Object[] values = recordRows.get(i);
+        assertEquals(values.length, 2);
+        int intRawVal = (int) values[0];
+        int intVal = (int) values[1];
+        assertEquals(intRawVal, 2);
+        assertEquals(intVal, intRawVal);
+        assertEquals(intVal, intRawVal);
+      }
+    }
+  }
+
+  @Test
+  public void testSimpleAggregateQueries() {
+    {
+      // Aggregation on int columns
+      String query = "SELECT COUNTMV(mvIntCol), COUNTMV(mvRawIntCol), SUMMV(mvIntCol), SUMMV(mvRawIntCol), "
+          + "MINMV(mvIntCol), MINMV(mvRawIntCol), MAXMV(mvIntCol), MAXMV(mvRawIntCol), AVGMV(mvIntCol), "
+          + "AVGMV(mvRawIntCol) from testTable";
+      ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+      DataSchema dataSchema = new DataSchema(new String[]{
+          "countmv(mvIntCol)", "countmv(mvRawIntCol)", "summv(mvIntCol)", "summv(mvRawIntCol)", "minmv(mvIntCol)",
+          "minmv(mvRawIntCol)", "maxmv(mvIntCol)", "maxmv(mvRawIntCol)", "avgmv(mvIntCol)", "avgmv(mvRawIntCol)"
+      }, new DataSchema.ColumnDataType[]{
+          DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE
+      });
+      validateSimpleAggregateQueryResults(resultTable, dataSchema);
+    }
+    {
+      // Aggregation on long columns
+      String query = "SELECT COUNTMV(mvLongCol), COUNTMV(mvRawLongCol), SUMMV(mvLongCol), SUMMV(mvRawLongCol), "
+          + "MINMV(mvLongCol), MINMV(mvRawLongCol), MAXMV(mvLongCol), MAXMV(mvRawLongCol), AVGMV(mvLongCol), "
+          + "AVGMV(mvRawLongCol) from testTable";
+      ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+      DataSchema dataSchema = new DataSchema(new String[]{
+          "countmv(mvLongCol)", "countmv(mvRawLongCol)", "summv(mvLongCol)", "summv(mvRawLongCol)", "minmv(mvLongCol)",
+          "minmv(mvRawLongCol)", "maxmv(mvLongCol)", "maxmv(mvRawLongCol)", "avgmv(mvLongCol)", "avgmv(mvRawLongCol)"
+      }, new DataSchema.ColumnDataType[]{
+          DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE
+      });
+      validateSimpleAggregateQueryResults(resultTable, dataSchema);
+    }
+    {
+      // Aggregation on float columns
+      String query = "SELECT COUNTMV(mvFloatCol), COUNTMV(mvRawFloatCol), SUMMV(mvFloatCol), SUMMV(mvRawFloatCol), "
+          + "MINMV(mvFloatCol), MINMV(mvRawFloatCol), MAXMV(mvFloatCol), MAXMV(mvRawFloatCol), AVGMV(mvFloatCol), "
+          + "AVGMV(mvRawFloatCol) from testTable";
+      ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+      DataSchema dataSchema = new DataSchema(new String[]{
+          "countmv(mvFloatCol)", "countmv(mvRawFloatCol)", "summv(mvFloatCol)", "summv(mvRawFloatCol)",
+          "minmv(mvFloatCol)", "minmv(mvRawFloatCol)", "maxmv(mvFloatCol)", "maxmv(mvRawFloatCol)",
+          "avgmv(mvFloatCol)", "avgmv(mvRawFloatCol)"
+      }, new DataSchema.ColumnDataType[]{
+          DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE
+      });
+      validateSimpleAggregateQueryResults(resultTable, dataSchema);
+    }
+    {
+      // Aggregation on double columns
+      String query = "SELECT COUNTMV(mvDoubleCol), COUNTMV(mvRawDoubleCol), SUMMV(mvDoubleCol), SUMMV(mvRawDoubleCol), "
+          + "MINMV(mvDoubleCol), MINMV(mvRawDoubleCol), MAXMV(mvDoubleCol), MAXMV(mvRawDoubleCol), AVGMV(mvDoubleCol), "
+          + "AVGMV(mvRawDoubleCol) from testTable";
+      ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+      DataSchema dataSchema = new DataSchema(new String[]{
+          "countmv(mvDoubleCol)", "countmv(mvRawDoubleCol)", "summv(mvDoubleCol)", "summv(mvRawDoubleCol)",
+          "minmv(mvDoubleCol)", "minmv(mvRawDoubleCol)", "maxmv(mvDoubleCol)", "maxmv(mvRawDoubleCol)",
+          "avgmv(mvDoubleCol)", "avgmv(mvRawDoubleCol)"
+      }, new DataSchema.ColumnDataType[]{
+          DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE
+      });
+      validateSimpleAggregateQueryResults(resultTable, dataSchema);
+    }
+    {
+      // Aggregation on string columns
+      String query = "SELECT COUNTMV(mvStringCol), COUNTMV(mvRawStringCol), SUMMV(mvStringCol), SUMMV(mvRawStringCol), "
+          + "MINMV(mvStringCol), MINMV(mvRawStringCol), MAXMV(mvStringCol), MAXMV(mvRawStringCol), AVGMV(mvStringCol), "
+          + "AVGMV(mvRawStringCol) from testTable";
+      ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+      DataSchema dataSchema = new DataSchema(new String[]{
+          "countmv(mvStringCol)", "countmv(mvRawStringCol)", "summv(mvStringCol)", "summv(mvRawStringCol)",
+          "minmv(mvStringCol)", "minmv(mvRawStringCol)", "maxmv(mvStringCol)", "maxmv(mvRawStringCol)",
+          "avgmv(mvStringCol)", "avgmv(mvRawStringCol)"
+      }, new DataSchema.ColumnDataType[]{
+          DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE
+      });
+      validateSimpleAggregateQueryResults(resultTable, dataSchema);
+    }
+  }
+
+  private void validateSimpleAggregateQueryResults(ResultTable resultTable, DataSchema expectedDataSchema) {
+    assertNotNull(resultTable);
+    assertEquals(resultTable.getDataSchema(), expectedDataSchema);
+    List<Object[]> recordRows = resultTable.getRows();
+    assertEquals(recordRows.size(), 1);
+
+    Object[] values = recordRows.get(0);
+    long countInt = (long) values[0];
+    long countIntRaw = (long) values[1];
+    assertEquals(countInt, 160);
+    assertEquals(countInt, countIntRaw);
+
+    double sumInt = (double) values[2];
+    double sumIntRaw = (double) values[3];
+    assertEquals(sumInt, 88720.0);
+    assertEquals(sumInt, sumIntRaw);
+
+    double minInt = (double) values[4];
+    double minIntRaw = (double) values[5];
+    assertEquals(minInt, 0.0);
+    assertEquals(minInt, minIntRaw);
+
+    double maxInt = (double) values[6];
+    double maxIntRaw = (double) values[7];
+    assertEquals(maxInt, 1109.0);
+    assertEquals(maxInt, maxIntRaw);
+
+    double avgInt = (double) values[8];
+    double avgIntRaw = (double) values[9];
+    assertEquals(avgInt, 554.5);
+    assertEquals(avgInt, avgIntRaw);
+  }
+
+  @Test
+  public void testAggregateWithGroupByQueries() {
+    {
+      // Aggregation on int columns with group by
+      String query = "SELECT COUNTMV(mvIntCol), COUNTMV(mvRawIntCol), SUMMV(mvIntCol), SUMMV(mvRawIntCol), "
+          + "MINMV(mvIntCol), MINMV(mvRawIntCol), MAXMV(mvIntCol), MAXMV(mvRawIntCol), AVGMV(mvIntCol), "
+          + "AVGMV(mvRawIntCol), svIntCol, mvRawLongCol from testTable GROUP BY svIntCol, mvRawLongCol "
+          + "ORDER BY svIntCol";
+      ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+      DataSchema dataSchema = new DataSchema(new String[]{
+          "countmv(mvIntCol)", "countmv(mvRawIntCol)", "summv(mvIntCol)", "summv(mvRawIntCol)", "minmv(mvIntCol)",
+          "minmv(mvRawIntCol)", "maxmv(mvIntCol)", "maxmv(mvRawIntCol)", "avgmv(mvIntCol)", "avgmv(mvRawIntCol)",
+          "svIntCol", "mvRawLongCol"
+      }, new DataSchema.ColumnDataType[]{
+          DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.LONG
+      });
+      validateAggregateWithGroupByQueryResults(resultTable, dataSchema, false);
+    }
+    {
+      // Aggregation on long columns with group by
+      String query = "SELECT COUNTMV(mvLongCol), COUNTMV(mvRawLongCol), SUMMV(mvLongCol), SUMMV(mvRawLongCol), "
+          + "MINMV(mvLongCol), MINMV(mvRawLongCol), MAXMV(mvLongCol), MAXMV(mvRawLongCol), AVGMV(mvLongCol), "
+          + "AVGMV(mvRawLongCol), svIntCol, mvRawIntCol from testTable GROUP BY svIntCol, mvRawIntCol "
+          + "ORDER BY svIntCol";
+      ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+      DataSchema dataSchema = new DataSchema(new String[]{
+          "countmv(mvLongCol)", "countmv(mvRawLongCol)", "summv(mvLongCol)", "summv(mvRawLongCol)", "minmv(mvLongCol)",
+          "minmv(mvRawLongCol)", "maxmv(mvLongCol)", "maxmv(mvRawLongCol)", "avgmv(mvLongCol)", "avgmv(mvRawLongCol)",
+          "svIntCol", "mvRawIntCol"
+      }, new DataSchema.ColumnDataType[]{
+          DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT
+      });
+      validateAggregateWithGroupByQueryResults(resultTable, dataSchema, false);
+    }
+    {
+      // Aggregation on float columns with group by
+      String query = "SELECT COUNTMV(mvFloatCol), COUNTMV(mvRawFloatCol), SUMMV(mvFloatCol), SUMMV(mvRawFloatCol), "
+          + "MINMV(mvFloatCol), MINMV(mvRawFloatCol), MAXMV(mvFloatCol), MAXMV(mvRawFloatCol), AVGMV(mvFloatCol), "
+          + "AVGMV(mvRawFloatCol), svIntCol, mvRawIntCol from testTable GROUP BY svIntCol, mvRawIntCol "
+          + "ORDER BY svIntCol";
+      ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+      DataSchema dataSchema = new DataSchema(new String[]{
+          "countmv(mvFloatCol)", "countmv(mvRawFloatCol)", "summv(mvFloatCol)", "summv(mvRawFloatCol)",
+          "minmv(mvFloatCol)", "minmv(mvRawFloatCol)", "maxmv(mvFloatCol)", "maxmv(mvRawFloatCol)",
+          "avgmv(mvFloatCol)", "avgmv(mvRawFloatCol)", "svIntCol", "mvRawIntCol"
+      }, new DataSchema.ColumnDataType[]{
+          DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT
+      });
+      validateAggregateWithGroupByQueryResults(resultTable, dataSchema, false);
+    }
+    {
+      // Aggregation on double columns with group by
+      String query = "SELECT COUNTMV(mvDoubleCol), COUNTMV(mvRawDoubleCol), SUMMV(mvDoubleCol), SUMMV(mvRawDoubleCol), "
+          + "MINMV(mvDoubleCol), MINMV(mvRawDoubleCol), MAXMV(mvDoubleCol), MAXMV(mvRawDoubleCol), AVGMV(mvDoubleCol), "
+          + "AVGMV(mvRawDoubleCol), svIntCol, mvRawIntCol from testTable GROUP BY svIntCol, mvRawIntCol "
+          + "ORDER BY svIntCol";
+      ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+      DataSchema dataSchema = new DataSchema(new String[]{
+          "countmv(mvDoubleCol)", "countmv(mvRawDoubleCol)", "summv(mvDoubleCol)", "summv(mvRawDoubleCol)",
+          "minmv(mvDoubleCol)", "minmv(mvRawDoubleCol)", "maxmv(mvDoubleCol)", "maxmv(mvRawDoubleCol)",
+          "avgmv(mvDoubleCol)", "avgmv(mvRawDoubleCol)", "svIntCol", "mvRawIntCol"
+      }, new DataSchema.ColumnDataType[]{
+          DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT
+      });
+      validateAggregateWithGroupByQueryResults(resultTable, dataSchema, false);
+    }
+    {
+      // Aggregation on string columns with group by
+      String query = "SELECT COUNTMV(mvStringCol), COUNTMV(mvRawStringCol), SUMMV(mvStringCol), SUMMV(mvRawStringCol), "
+          + "MINMV(mvStringCol), MINMV(mvRawStringCol), MAXMV(mvStringCol), MAXMV(mvRawStringCol), AVGMV(mvStringCol), "
+          + "AVGMV(mvRawStringCol), svIntCol, mvRawIntCol from testTable GROUP BY svIntCol, mvRawIntCol "
+          + "ORDER BY svIntCol";
+      ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+      DataSchema dataSchema = new DataSchema(new String[]{
+          "countmv(mvStringCol)", "countmv(mvRawStringCol)", "summv(mvStringCol)", "summv(mvRawStringCol)",
+          "minmv(mvStringCol)", "minmv(mvRawStringCol)", "maxmv(mvStringCol)", "maxmv(mvRawStringCol)",
+          "avgmv(mvStringCol)", "avgmv(mvRawStringCol)", "svIntCol", "mvRawIntCol"
+      }, new DataSchema.ColumnDataType[]{
+          DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT
+      });
+      validateAggregateWithGroupByQueryResults(resultTable, dataSchema, false);
+    }
+    {
+      // Aggregation on int columns with group by on 3 columns
+      String query = "SELECT COUNTMV(mvIntCol), COUNTMV(mvRawIntCol), SUMMV(mvIntCol), SUMMV(mvRawIntCol), "
+          + "MINMV(mvIntCol), MINMV(mvRawIntCol), MAXMV(mvIntCol), MAXMV(mvRawIntCol), AVGMV(mvIntCol), "
+          + "AVGMV(mvRawIntCol), svIntCol, mvLongCol, mvRawLongCol from testTable GROUP BY svIntCol, mvLongCol, "
+          + "mvRawLongCol ORDER BY svIntCol";
+      ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+      DataSchema dataSchema = new DataSchema(new String[]{
+          "countmv(mvIntCol)", "countmv(mvRawIntCol)", "summv(mvIntCol)", "summv(mvRawIntCol)", "minmv(mvIntCol)",
+          "minmv(mvRawIntCol)", "maxmv(mvIntCol)", "maxmv(mvRawIntCol)", "avgmv(mvIntCol)", "avgmv(mvRawIntCol)",
+          "svIntCol", "mvLongCol", "mvRawLongCol"
+      }, new DataSchema.ColumnDataType[]{
+          DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.LONG,
+          DataSchema.ColumnDataType.LONG
+      });
+      validateAggregateWithGroupByQueryResults(resultTable, dataSchema, true);
+    }
+    {
+      // Aggregation on long columns with group by on 3 columns
+      String query = "SELECT COUNTMV(mvLongCol), COUNTMV(mvRawLongCol), SUMMV(mvLongCol), SUMMV(mvRawLongCol), "
+          + "MINMV(mvLongCol), MINMV(mvRawLongCol), MAXMV(mvLongCol), MAXMV(mvRawLongCol), AVGMV(mvLongCol), "
+          + "AVGMV(mvRawLongCol), svIntCol, mvIntCol, mvRawIntCol from testTable GROUP BY svIntCol, mvIntCol, "
+          + "mvRawIntCol ORDER BY svIntCol";
+      ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+      DataSchema dataSchema = new DataSchema(new String[]{
+          "countmv(mvLongCol)", "countmv(mvRawLongCol)", "summv(mvLongCol)", "summv(mvRawLongCol)", "minmv(mvLongCol)",
+          "minmv(mvRawLongCol)", "maxmv(mvLongCol)", "maxmv(mvRawLongCol)", "avgmv(mvLongCol)", "avgmv(mvRawLongCol)",
+          "svIntCol", "mvIntCol", "mvRawIntCol"
+      }, new DataSchema.ColumnDataType[]{
+          DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT,
+          DataSchema.ColumnDataType.INT
+      });
+      validateAggregateWithGroupByQueryResults(resultTable, dataSchema, true);
+    }
+    {
+      // Aggregation on float columns with group by on 3 columns
+      String query = "SELECT COUNTMV(mvFloatCol), COUNTMV(mvRawFloatCol), SUMMV(mvFloatCol), SUMMV(mvRawFloatCol), "
+          + "MINMV(mvFloatCol), MINMV(mvRawFloatCol), MAXMV(mvFloatCol), MAXMV(mvRawFloatCol), AVGMV(mvFloatCol), "
+          + "AVGMV(mvRawFloatCol), svIntCol, mvIntCol, mvRawIntCol  from testTable GROUP BY svIntCol, mvIntCol, "
+          + "mvRawIntCol ORDER BY svIntCol";
+      ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+      DataSchema dataSchema = new DataSchema(new String[]{
+          "countmv(mvFloatCol)", "countmv(mvRawFloatCol)", "summv(mvFloatCol)", "summv(mvRawFloatCol)",
+          "minmv(mvFloatCol)", "minmv(mvRawFloatCol)", "maxmv(mvFloatCol)", "maxmv(mvRawFloatCol)",
+          "avgmv(mvFloatCol)", "avgmv(mvRawFloatCol)", "svIntCol", "mvIntCol", "mvRawIntCol"
+      }, new DataSchema.ColumnDataType[]{
+          DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT,
+          DataSchema.ColumnDataType.INT
+      });
+      validateAggregateWithGroupByQueryResults(resultTable, dataSchema, true);
+    }
+    {
+      // Aggregation on double columns with group by on 3 columns
+      String query = "SELECT COUNTMV(mvDoubleCol), COUNTMV(mvRawDoubleCol), SUMMV(mvDoubleCol), SUMMV(mvRawDoubleCol), "
+          + "MINMV(mvDoubleCol), MINMV(mvRawDoubleCol), MAXMV(mvDoubleCol), MAXMV(mvRawDoubleCol), AVGMV(mvDoubleCol), "
+          + "AVGMV(mvRawDoubleCol), svIntCol, mvIntCol, mvRawIntCol from testTable GROUP BY svIntCol, mvIntCol, "
+          + "mvRawIntCol ORDER BY svIntCol";
+      ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+      DataSchema dataSchema = new DataSchema(new String[]{
+          "countmv(mvDoubleCol)", "countmv(mvRawDoubleCol)", "summv(mvDoubleCol)", "summv(mvRawDoubleCol)",
+          "minmv(mvDoubleCol)", "minmv(mvRawDoubleCol)", "maxmv(mvDoubleCol)", "maxmv(mvRawDoubleCol)",
+          "avgmv(mvDoubleCol)", "avgmv(mvRawDoubleCol)", "svIntCol", "mvIntCol", "mvRawIntCol"
+      }, new DataSchema.ColumnDataType[]{
+          DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT,
+          DataSchema.ColumnDataType.INT
+      });
+      validateAggregateWithGroupByQueryResults(resultTable, dataSchema, true);
+    }
+    {
+      // Aggregation on string columns with group by on 3 columns
+      String query = "SELECT COUNTMV(mvStringCol), COUNTMV(mvRawStringCol), SUMMV(mvStringCol), SUMMV(mvRawStringCol), "
+          + "MINMV(mvStringCol), MINMV(mvRawStringCol), MAXMV(mvStringCol), MAXMV(mvRawStringCol), AVGMV(mvStringCol), "
+          + "AVGMV(mvRawStringCol), svIntCol, mvIntCol, mvRawIntCol from testTable GROUP BY svIntCol, mvIntCol,"
+          + "mvRawIntCol ORDER BY svIntCol";
+      ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+      DataSchema dataSchema = new DataSchema(new String[]{
+          "countmv(mvStringCol)", "countmv(mvRawStringCol)", "summv(mvStringCol)", "summv(mvRawStringCol)",
+          "minmv(mvStringCol)", "minmv(mvRawStringCol)", "maxmv(mvStringCol)", "maxmv(mvRawStringCol)",
+          "avgmv(mvStringCol)", "avgmv(mvRawStringCol)", "svIntCol", "mvIntCol", "mvRawIntCol"
+      }, new DataSchema.ColumnDataType[]{
+          DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT,
+          DataSchema.ColumnDataType.INT
+      });
+      validateAggregateWithGroupByQueryResults(resultTable, dataSchema, true);
+    }
+    {
+      // Aggregation on int columns with group by on 3 columns, two of them RAW
+      String query = "SELECT COUNTMV(mvIntCol), COUNTMV(mvRawIntCol), SUMMV(mvIntCol), SUMMV(mvRawIntCol), "
+          + "MINMV(mvIntCol), MINMV(mvRawIntCol), MAXMV(mvIntCol), MAXMV(mvRawIntCol), AVGMV(mvIntCol), "
+          + "AVGMV(mvRawIntCol), svIntCol, mvRawLongCol, mvRawFloatCol from testTable GROUP BY svIntCol, mvRawLongCol, "
+          + "mvRawFloatCol ORDER BY svIntCol";
+      ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+      DataSchema dataSchema = new DataSchema(new String[]{
+          "countmv(mvIntCol)", "countmv(mvRawIntCol)", "summv(mvIntCol)", "summv(mvRawIntCol)", "minmv(mvIntCol)",
+          "minmv(mvRawIntCol)", "maxmv(mvIntCol)", "maxmv(mvRawIntCol)", "avgmv(mvIntCol)", "avgmv(mvRawIntCol)",
+          "svIntCol", "mvRawLongCol", "mvRawFloatCol"
+      }, new DataSchema.ColumnDataType[]{
+          DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+          DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.LONG,
+          DataSchema.ColumnDataType.FLOAT
+      });
+      assertNotNull(resultTable);
+      assertEquals(resultTable.getDataSchema(), dataSchema);
+      List<Object[]> recordRows = resultTable.getRows();
+      assertEquals(recordRows.size(), 10);
+
+      int[] expectedSVIntValues;
+      expectedSVIntValues = new int[]{0, 0, 0, 0, 1, 1, 1, 1, 2, 2};
+
+      for (int i = 0; i < 10; i++) {
+        Object[] values = recordRows.get(i);
+        assertEquals(values.length, 13);
+
+        long count = (long) values[0];
+        long countRaw = (long) values[1];
+        assertEquals(count, 8);
+        assertEquals(count, countRaw);
+
+        double sum = (double) values[2];
+        double sumRaw = (double) values[3];
+        assertEquals(sum, sumRaw);
+
+        double min = (double) values[4];
+        double minRaw = (double) values[5];
+        assertEquals(min, minRaw);
+
+        double max = (double) values[6];
+        double maxRaw = (double) values[7];
+        assertEquals(max, maxRaw);
+
+        assertEquals(max - min, (double) MV_OFFSET);
+
+        double avg = (double) values[8];
+        double avgRaw = (double) values[9];
+        assertEquals(avg, avgRaw);
+
+        assertEquals((int) values[10], expectedSVIntValues[i]);
+
+        assertTrue((long) values[11] == expectedSVIntValues[i]
+            || (long) values[11] == expectedSVIntValues[i] + MV_OFFSET);
+
+        assertTrue((float) values[12] == (float) expectedSVIntValues[i]
+            || (float) values[12] == (float) (expectedSVIntValues[i] + MV_OFFSET));
+      }
+    }
+  }
+
+  private void validateAggregateWithGroupByQueryResults(ResultTable resultTable, DataSchema expectedDataSchema,
+      boolean isThreeColumnGroupBy) {
+    assertNotNull(resultTable);
+    assertEquals(resultTable.getDataSchema(), expectedDataSchema);
+    List<Object[]> recordRows = resultTable.getRows();
+    assertEquals(recordRows.size(), 10);
+
+    int[] expectedSVIntValues;
+
+    if (isThreeColumnGroupBy) {
+      expectedSVIntValues = new int[]{0, 0, 0, 0, 1, 1, 1, 1, 2, 2};
+    } else {
+      expectedSVIntValues = new int[]{0, 0, 1, 1, 2, 2, 3, 3, 4, 4};
+    }
+
+    for (int i = 0; i < 10; i++) {
+      Object[] values = recordRows.get(i);
+      if (isThreeColumnGroupBy) {
+        assertEquals(values.length, 13);
+      } else {
+        assertEquals(values.length, 12);
+      }
+
+      long count = (long) values[0];
+      long countRaw = (long) values[1];
+      assertEquals(count, 8);
+      assertEquals(count, countRaw);
+
+      double sum = (double) values[2];
+      double sumRaw = (double) values[3];
+      assertEquals(sum, sumRaw);
+
+      double min = (double) values[4];
+      double minRaw = (double) values[5];
+      assertEquals(min, minRaw);
+
+      double max = (double) values[6];
+      double maxRaw = (double) values[7];
+      assertEquals(max, maxRaw);
+
+      assertEquals(max - min, (double) MV_OFFSET);
+
+      double avg = (double) values[8];
+      double avgRaw = (double) values[9];
+      assertEquals(avg, avgRaw);
+
+      assertEquals((int) values[10], expectedSVIntValues[i]);
+
+      if (expectedDataSchema.getColumnDataType(11) == DataSchema.ColumnDataType.LONG) {
+        assertTrue((long) values[11] == expectedSVIntValues[i]
+            || (long) values[11] == expectedSVIntValues[i] + MV_OFFSET);
+      } else {
+        assertTrue((int) values[11] == expectedSVIntValues[i]
+            || (int) values[11] == expectedSVIntValues[i] + MV_OFFSET);
+      }
+
+      if (isThreeColumnGroupBy) {
+        if (expectedDataSchema.getColumnDataType(12) == DataSchema.ColumnDataType.LONG) {
+          assertTrue((long) values[12] == expectedSVIntValues[i]
+              || (long) values[12] == expectedSVIntValues[i] + MV_OFFSET);
+        } else {
+          assertTrue((int) values[12] == expectedSVIntValues[i]
+              || (int) values[12] == expectedSVIntValues[i] + MV_OFFSET);
+        }
+      }
+    }
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/DefaultMutableIndexProvider.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/DefaultMutableIndexProvider.java
index bbe9a7f286..9a03397157 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/DefaultMutableIndexProvider.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/DefaultMutableIndexProvider.java
@@ -51,24 +51,35 @@ public class DefaultMutableIndexProvider implements MutableIndexProvider {
     FieldSpec.DataType storedType = context.getFieldSpec().getDataType().getStoredType();
     boolean isSingleValue = context.getFieldSpec().isSingleValueField();
     if (!context.hasDictionary()) {
-      // No dictionary column must be single-valued
-      assert isSingleValue;
-      String allocationContext =
-          buildAllocationContext(context.getSegmentName(), context.getFieldSpec().getName(),
-              V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
-      if (storedType.isFixedWidth()) {
-        return new FixedByteSVMutableForwardIndex(false, storedType, context.getCapacity(), context.getMemoryManager(),
-            allocationContext);
+      if (isSingleValue) {
+        String allocationContext =
+            buildAllocationContext(context.getSegmentName(), context.getFieldSpec().getName(),
+                V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
+        if (storedType.isFixedWidth()) {
+          return new FixedByteSVMutableForwardIndex(false, storedType, context.getCapacity(),
+              context.getMemoryManager(), allocationContext);
+        } else {
+          // RealtimeSegmentStatsHistory does not have the stats for no-dictionary columns from previous consuming
+          // segments
+          // TODO: Add support for updating RealtimeSegmentStatsHistory with average column value size for no dictionary
+          //       columns as well
+          // TODO: Use the stats to get estimated average length
+          // Use a smaller capacity as opposed to segment flush size
+          int initialCapacity = Math.min(context.getCapacity(),
+              NODICT_VARIABLE_WIDTH_ESTIMATED_NUMBER_OF_VALUES_DEFAULT);
+          return new VarByteSVMutableForwardIndex(storedType, context.getMemoryManager(), allocationContext,
+              initialCapacity, NODICT_VARIABLE_WIDTH_ESTIMATED_AVERAGE_VALUE_LENGTH_DEFAULT);
+        }
       } else {
-        // RealtimeSegmentStatsHistory does not have the stats for no-dictionary columns from previous consuming
-        // segments
-        // TODO: Add support for updating RealtimeSegmentStatsHistory with average column value size for no dictionary
-        //       columns as well
-        // TODO: Use the stats to get estimated average length
-        // Use a smaller capacity as opposed to segment flush size
-        int initialCapacity = Math.min(context.getCapacity(), NODICT_VARIABLE_WIDTH_ESTIMATED_NUMBER_OF_VALUES_DEFAULT);
-        return new VarByteSVMutableForwardIndex(storedType, context.getMemoryManager(), allocationContext,
-            initialCapacity, NODICT_VARIABLE_WIDTH_ESTIMATED_AVERAGE_VALUE_LENGTH_DEFAULT);
+        // TODO: Add support for variable width (bytes, string, big decimal) MV RAW column types
+        assert storedType.isFixedWidth();
+        String allocationContext =
+            buildAllocationContext(context.getSegmentName(), context.getFieldSpec().getName(),
+                V1Constants.Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
+        // TODO: Start with a smaller capacity on FixedByteMVForwardIndexReaderWriter and let it expand
+        return new FixedByteMVMutableForwardIndex(MAX_MULTI_VALUES_PER_ROW, context.getAvgNumMultiValues(),
+            context.getCapacity(), storedType.size(), context.getMemoryManager(), allocationContext, false,
+            storedType);
       }
     } else {
       if (isSingleValue) {
@@ -82,7 +93,7 @@ public class DefaultMutableIndexProvider implements MutableIndexProvider {
         // TODO: Start with a smaller capacity on FixedByteMVForwardIndexReaderWriter and let it expand
         return new FixedByteMVMutableForwardIndex(MAX_MULTI_VALUES_PER_ROW, context.getAvgNumMultiValues(),
             context.getCapacity(), Integer.BYTES,
-            context.getMemoryManager(), allocationContext);
+            context.getMemoryManager(), allocationContext, true, INT);
       }
     }
   }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/IntermediateSegment.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/IntermediateSegment.java
index c5f013a3d8..481829c86e 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/IntermediateSegment.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/IntermediateSegment.java
@@ -153,7 +153,7 @@ public class IntermediateSegment implements MutableSegment {
         // TODO: Start with a smaller capacity on FixedByteMVForwardIndexReaderWriter and let it expand
         forwardIndex =
             new FixedByteMVMutableForwardIndex(MAX_MULTI_VALUES_PER_ROW, DEFAULT_AVG_MULTI_VALUE_COUNT, _capacity,
-                Integer.BYTES, _memoryManager, allocationContext);
+                Integer.BYTES, _memoryManager, allocationContext, true, DataType.INT);
       }
 
       _indexContainerMap.put(column,
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 1f17cd682e..179b1fe795 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -436,9 +436,10 @@ public class MutableSegmentImpl implements MutableSegment {
             column, dataType.toString());
         return false;
       }
-      // So don't create dictionary if the column is member of noDictionary, is single-value
-      // and doesn't have an inverted index
-      return fieldSpec.isSingleValueField() && !invertedIndexColumns.contains(column);
+      // So don't create dictionary if the column (1) is member of noDictionary, and (2) is single-value or multi-value
+      // with a fixed-width field, and (3) doesn't have an inverted index
+      return (fieldSpec.isSingleValueField() || fieldSpec.getDataType().isFixedWidth())
+          && !invertedIndexColumns.contains(column);
     }
     // column is not a part of noDictionary set, so create dictionary
     return false;
@@ -764,26 +765,75 @@ public class MutableSegmentImpl implements MutableSegment {
           }
         }
       } else {
-        // Multi-value column (always dictionary-encoded)
+        // Multi-value column
 
         int[] dictIds = indexContainer._dictIds;
 
-        // Update numValues info
-        indexContainer._numValuesInfo.updateMVEntry(dictIds.length);
+        if (dictIds != null) {
+          // Dictionary encoded
+          // Update numValues info
+          indexContainer._numValuesInfo.updateMVEntry(dictIds.length);
 
-        // Update forward index
-        indexContainer._forwardIndex.setDictIdMV(docId, dictIds);
+          // Update forward index
+          indexContainer._forwardIndex.setDictIdMV(docId, dictIds);
 
-        // Update inverted index
-        MutableInvertedIndex invertedIndex = indexContainer._invertedIndex;
-        if (invertedIndex != null) {
-          for (int dictId : dictIds) {
-            try {
-              invertedIndex.add(dictId, docId);
-            } catch (Exception e) {
-              recordIndexingError(FieldConfig.IndexType.INVERTED, e);
+          // Update inverted index
+          MutableInvertedIndex invertedIndex = indexContainer._invertedIndex;
+          if (invertedIndex != null) {
+            for (int dictId : dictIds) {
+              try {
+                invertedIndex.add(dictId, docId);
+              } catch (Exception e) {
+                recordIndexingError(FieldConfig.IndexType.INVERTED, e);
+              }
             }
           }
+        } else {
+          // Raw MV columns
+
+          // Update forward index and numValues info
+          DataType dataType = fieldSpec.getDataType();
+          switch (dataType.getStoredType()) {
+            case INT:
+              Object[] values = (Object[]) value;
+              int[] intValues = new int[values.length];
+              for (int i = 0; i < values.length; i++) {
+                intValues[i] = (Integer) values[i];
+              }
+              indexContainer._forwardIndex.setIntMV(docId, intValues);
+              indexContainer._numValuesInfo.updateMVEntry(intValues.length);
+              break;
+            case LONG:
+              values = (Object[]) value;
+              long[] longValues = new long[values.length];
+              for (int i = 0; i < values.length; i++) {
+                longValues[i] = (Long) values[i];
+              }
+              indexContainer._forwardIndex.setLongMV(docId, longValues);
+              indexContainer._numValuesInfo.updateMVEntry(longValues.length);
+              break;
+            case FLOAT:
+              values = (Object[]) value;
+              float[] floatValues = new float[values.length];
+              for (int i = 0; i < values.length; i++) {
+                floatValues[i] = (Float) values[i];
+              }
+              indexContainer._forwardIndex.setFloatMV(docId, floatValues);
+              indexContainer._numValuesInfo.updateMVEntry(floatValues.length);
+              break;
+            case DOUBLE:
+              values = (Object[]) value;
+              double[] doubleValues = new double[values.length];
+              for (int i = 0; i < values.length; i++) {
+                doubleValues[i] = (Double) values[i];
+              }
+              indexContainer._forwardIndex.setDoubleMV(docId, doubleValues);
+              indexContainer._numValuesInfo.updateMVEntry(doubleValues.length);
+              break;
+            default:
+              throw new UnsupportedOperationException(
+                  "Unsupported data type: " + dataType + " for MV no-dictionary column: " + column);
+          }
         }
       }
     }
@@ -982,24 +1032,66 @@ public class MutableSegmentImpl implements MutableSegment {
       }
     } else {
       // Raw index based
-      // TODO: support multi-valued column
-      switch (forwardIndex.getValueType()) {
-        case INT:
-          return forwardIndex.getInt(docId);
-        case LONG:
-          return forwardIndex.getLong(docId);
-        case FLOAT:
-          return forwardIndex.getFloat(docId);
-        case DOUBLE:
-          return forwardIndex.getDouble(docId);
-        case BIG_DECIMAL:
-          return forwardIndex.getBigDecimal(docId);
-        case STRING:
-          return forwardIndex.getString(docId);
-        case BYTES:
-          return forwardIndex.getBytes(docId);
-        default:
-          throw new IllegalStateException();
+      if (forwardIndex.isSingleValue()) {
+        switch (forwardIndex.getValueType()) {
+          case INT:
+            return forwardIndex.getInt(docId);
+          case LONG:
+            return forwardIndex.getLong(docId);
+          case FLOAT:
+            return forwardIndex.getFloat(docId);
+          case DOUBLE:
+            return forwardIndex.getDouble(docId);
+          case BIG_DECIMAL:
+            return forwardIndex.getBigDecimal(docId);
+          case STRING:
+            return forwardIndex.getString(docId);
+          case BYTES:
+            return forwardIndex.getBytes(docId);
+          default:
+            throw new IllegalStateException();
+        }
+      } else {
+        // TODO: support multi-valued column for variable length column types (big decimal, string, bytes)
+        int numValues;
+        Object[] value;
+        switch (forwardIndex.getValueType()) {
+          case INT:
+            int[] intValues = forwardIndex.getIntMV(docId);
+            numValues = intValues.length;
+            value = new Object[numValues];
+            for (int i = 0; i < numValues; i++) {
+              value[i] = intValues[i];
+            }
+            return value;
+          case LONG:
+            long[] longValues = forwardIndex.getLongMV(docId);
+            numValues = longValues.length;
+            value = new Object[numValues];
+            for (int i = 0; i < numValues; i++) {
+              value[i] = longValues[i];
+            }
+            return value;
+          case FLOAT:
+            float[] floatValues = forwardIndex.getFloatMV(docId);
+            numValues = floatValues.length;
+            value = new Object[numValues];
+            for (int i = 0; i < numValues; i++) {
+              value[i] = floatValues[i];
+            }
+            return value;
+          case DOUBLE:
+            double[] doubleValues = forwardIndex.getDoubleMV(docId);
+            numValues = doubleValues.length;
+            value = new Object[numValues];
+            for (int i = 0; i < numValues; i++) {
+              value[i] = doubleValues[i];
+            }
+            return value;
+          default:
+            throw new IllegalStateException("No support for MV no dictionary column of type "
+                + forwardIndex.getValueType());
+        }
       }
     }
   }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/FixedByteMVMutableForwardIndex.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/FixedByteMVMutableForwardIndex.java
index f08f95e23d..b962402ab3 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/FixedByteMVMutableForwardIndex.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/FixedByteMVMutableForwardIndex.java
@@ -26,6 +26,7 @@ import org.apache.pinot.segment.local.io.writer.impl.FixedByteSingleValueMultiCo
 import org.apache.pinot.segment.spi.index.mutable.MutableForwardIndex;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -114,6 +115,8 @@ public class FixedByteMVMutableForwardIndex implements MutableForwardIndex {
   private final int _rowCountPerChunk;
   private final PinotDataBufferMemoryManager _memoryManager;
   private final String _context;
+  private final boolean _isDictionaryEncoded;
+  private final FieldSpec.DataType _valueType;
 
   private FixedByteSingleValueMultiColWriter _curHeaderWriter;
   private FixedByteSingleValueMultiColWriter _currentDataWriter;
@@ -122,7 +125,8 @@ public class FixedByteMVMutableForwardIndex implements MutableForwardIndex {
   private int _prevRowLength = 0;  // Number of values in the column for the last row added.
 
   public FixedByteMVMutableForwardIndex(int maxNumberOfMultiValuesPerRow, int avgMultiValueCount, int rowCountPerChunk,
-      int columnSizeInBytes, PinotDataBufferMemoryManager memoryManager, String context) {
+      int columnSizeInBytes, PinotDataBufferMemoryManager memoryManager, String context, boolean isDictionaryEncoded,
+      FieldSpec.DataType valueType) {
     _memoryManager = memoryManager;
     _context = context;
     int initialCapacity = Math.max(maxNumberOfMultiValuesPerRow, rowCountPerChunk * avgMultiValueCount);
@@ -137,6 +141,8 @@ public class FixedByteMVMutableForwardIndex implements MutableForwardIndex {
     _incrementalCapacity = incrementalCapacity;
     addDataBuffer(initialCapacity);
     //init(_rowCountPerChunk, _columnSizeInBytes, _maxNumberOfMultiValuesPerRow, initialCapacity, _incrementalCapacity);
+    _isDictionaryEncoded = isDictionaryEncoded;
+    _valueType = valueType;
   }
 
   private void addHeaderBuffer() {
@@ -206,12 +212,9 @@ public class FixedByteMVMutableForwardIndex implements MutableForwardIndex {
     return newStartIndex;
   }
 
-  /**
-   * TODO: Currently we only support dictionary-encoded forward index on multi-value columns.
-   */
   @Override
   public boolean isDictionaryEncoded() {
-    return true;
+    return _isDictionaryEncoded;
   }
 
   @Override
@@ -221,7 +224,7 @@ public class FixedByteMVMutableForwardIndex implements MutableForwardIndex {
 
   @Override
   public DataType getValueType() {
-    return DataType.INT;
+    return _valueType;
   }
 
   @Override
@@ -248,11 +251,31 @@ public class FixedByteMVMutableForwardIndex implements MutableForwardIndex {
     return length;
   }
 
+  @Override
+  public int[] getDictIdMV(int docId) {
+    FixedByteSingleValueMultiColReader headerReader = getCurrentReader(docId);
+    int rowInCurrentHeader = getRowInCurrentHeader(docId);
+    int bufferIndex = headerReader.getInt(rowInCurrentHeader, 0);
+    int startIndex = headerReader.getInt(rowInCurrentHeader, 1);
+    int length = headerReader.getInt(rowInCurrentHeader, 2);
+    FixedByteSingleValueMultiColReader dataReader = _dataReaders.get(bufferIndex);
+    int[] dictIdBuffer = new int[length];
+    for (int i = 0; i < length; i++) {
+      dictIdBuffer[i] = dataReader.getInt(startIndex + i, 0);
+    }
+    return dictIdBuffer;
+  }
+
   @Override
   public int getIntMV(int docId, int[] valueBuffer) {
     return getDictIdMV(docId, valueBuffer);
   }
 
+  @Override
+  public int[] getIntMV(int docId) {
+    return getDictIdMV(docId);
+  }
+
   @Override
   public int getLongMV(int docId, long[] valueBuffer) {
     FixedByteSingleValueMultiColReader headerReader = getCurrentReader(docId);
@@ -267,6 +290,21 @@ public class FixedByteMVMutableForwardIndex implements MutableForwardIndex {
     return length;
   }
 
+  @Override
+  public long[] getLongMV(int docId) {
+    FixedByteSingleValueMultiColReader headerReader = getCurrentReader(docId);
+    int rowInCurrentHeader = getRowInCurrentHeader(docId);
+    int bufferIndex = headerReader.getInt(rowInCurrentHeader, 0);
+    int startIndex = headerReader.getInt(rowInCurrentHeader, 1);
+    int length = headerReader.getInt(rowInCurrentHeader, 2);
+    FixedByteSingleValueMultiColReader dataReader = _dataReaders.get(bufferIndex);
+    long[] valueBuffer = new long[length];
+    for (int i = 0; i < length; i++) {
+      valueBuffer[i] = dataReader.getLong(startIndex + i, 0);
+    }
+    return valueBuffer;
+  }
+
   @Override
   public int getFloatMV(int docId, float[] valueBuffer) {
     FixedByteSingleValueMultiColReader headerReader = getCurrentReader(docId);
@@ -281,6 +319,21 @@ public class FixedByteMVMutableForwardIndex implements MutableForwardIndex {
     return length;
   }
 
+  @Override
+  public float[] getFloatMV(int docId) {
+    FixedByteSingleValueMultiColReader headerReader = getCurrentReader(docId);
+    int rowInCurrentHeader = getRowInCurrentHeader(docId);
+    int bufferIndex = headerReader.getInt(rowInCurrentHeader, 0);
+    int startIndex = headerReader.getInt(rowInCurrentHeader, 1);
+    int length = headerReader.getInt(rowInCurrentHeader, 2);
+    FixedByteSingleValueMultiColReader dataReader = _dataReaders.get(bufferIndex);
+    float[] valueBuffer = new float[length];
+    for (int i = 0; i < length; i++) {
+      valueBuffer[i] = dataReader.getFloat(startIndex + i, 0);
+    }
+    return valueBuffer;
+  }
+
   @Override
   public int getDoubleMV(int docId, double[] valueBuffer) {
     FixedByteSingleValueMultiColReader headerReader = getCurrentReader(docId);
@@ -295,6 +348,28 @@ public class FixedByteMVMutableForwardIndex implements MutableForwardIndex {
     return length;
   }
 
+  @Override
+  public double[] getDoubleMV(int docId) {
+    FixedByteSingleValueMultiColReader headerReader = getCurrentReader(docId);
+    int rowInCurrentHeader = getRowInCurrentHeader(docId);
+    int bufferIndex = headerReader.getInt(rowInCurrentHeader, 0);
+    int startIndex = headerReader.getInt(rowInCurrentHeader, 1);
+    int length = headerReader.getInt(rowInCurrentHeader, 2);
+    FixedByteSingleValueMultiColReader dataReader = _dataReaders.get(bufferIndex);
+    double[] valueBuffer = new double[length];
+    for (int i = 0; i < length; i++) {
+      valueBuffer[i] = dataReader.getDouble(startIndex + i, 0);
+    }
+    return valueBuffer;
+  }
+
+  @Override
+  public int getNumValuesMV(int docId) {
+    FixedByteSingleValueMultiColReader headerReader = getCurrentReader(docId);
+    int rowInCurrentHeader = getRowInCurrentHeader(docId);
+    return headerReader.getInt(rowInCurrentHeader, 2);
+  }
+
   @Override
   public void setDictIdMV(int docId, int[] dictIds) {
     int newStartIndex = updateHeader(docId, dictIds.length);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
index ef8483ee7d..82f5bff1ca 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
@@ -77,7 +77,8 @@ public class MultiValueFixedByteRawIndexCreator implements ForwardIndexCreator {
       boolean deriveNumDocsPerChunk, int writerVersion)
       throws IOException {
     File file = new File(baseIndexDir, column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
-    int totalMaxLength = maxNumberOfMultiValueElements * valueType.getStoredType().size();
+    // Store the length followed by the values
+    int totalMaxLength = Integer.BYTES + (maxNumberOfMultiValueElements * valueType.getStoredType().size());
     int numDocsPerChunk =
         deriveNumDocsPerChunk ? Math.max(TARGET_MAX_CHUNK_SIZE / (totalMaxLength
             + VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE), 1) : DEFAULT_NUM_DOCS_PER_CHUNK;
@@ -152,7 +153,7 @@ public class MultiValueFixedByteRawIndexCreator implements ForwardIndexCreator {
   public void putDoubleMV(final double[] values) {
 
     byte[] bytes = new byte[Integer.BYTES
-        + values.length * Long.BYTES]; //numValues, bytes required to store the content
+        + values.length * Double.BYTES]; //numValues, bytes required to store the content
     ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
     //write the length
     byteBuffer.putInt(values.length);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/constant/ConstantMVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/constant/ConstantMVForwardIndexReader.java
index 36b32b8bd2..3882de26cf 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/constant/ConstantMVForwardIndexReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/constant/ConstantMVForwardIndexReader.java
@@ -49,6 +49,16 @@ public final class ConstantMVForwardIndexReader implements ForwardIndexReader<Fo
     return 1;
   }
 
+  @Override
+  public int[] getDictIdMV(int docId, ForwardIndexReaderContext context) {
+    return new int[]{0};
+  }
+
+  @Override
+  public int getNumValuesMV(int docId, ForwardIndexReaderContext context) {
+    return 1;
+  }
+
   @Override
   public void close() {
   }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
similarity index 92%
rename from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java
rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
index 289cfe3af5..6991996d8a 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
@@ -37,10 +37,10 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * Base implementation for chunk-based single-value raw (non-dictionary-encoded) forward index reader.
+ * Base implementation for chunk-based raw (non-dictionary-encoded) forward index reader.
  */
-public abstract class BaseChunkSVForwardIndexReader implements ForwardIndexReader<ChunkReaderContext> {
-  private static final Logger LOGGER = LoggerFactory.getLogger(BaseChunkSVForwardIndexReader.class);
+public abstract class BaseChunkForwardIndexReader implements ForwardIndexReader<ChunkReaderContext> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(BaseChunkForwardIndexReader.class);
 
   protected final PinotDataBuffer _dataBuffer;
   protected final DataType _valueType;
@@ -52,8 +52,9 @@ public abstract class BaseChunkSVForwardIndexReader implements ForwardIndexReade
   protected final PinotDataBuffer _dataHeader;
   protected final int _headerEntryChunkOffsetSize;
   protected final PinotDataBuffer _rawData;
+  protected final boolean _isSingleValue;
 
-  public BaseChunkSVForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType) {
+  public BaseChunkForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType, boolean isSingleValue) {
     _dataBuffer = dataBuffer;
     _valueType = valueType;
 
@@ -68,7 +69,7 @@ public abstract class BaseChunkSVForwardIndexReader implements ForwardIndexReade
     headerOffset += Integer.BYTES;
 
     _lengthOfLongestEntry = _dataBuffer.getInt(headerOffset);
-    if (valueType.isFixedWidth()) {
+    if (valueType.isFixedWidth() && isSingleValue) {
       Preconditions.checkState(_lengthOfLongestEntry == valueType.size());
     }
     headerOffset += Integer.BYTES;
@@ -98,6 +99,8 @@ public abstract class BaseChunkSVForwardIndexReader implements ForwardIndexReade
 
     // Useful for uncompressed data.
     _rawData = _dataBuffer.view(rawDataStart, _dataBuffer.size());
+
+    _isSingleValue = isSingleValue;
   }
 
   /**
@@ -163,7 +166,7 @@ public abstract class BaseChunkSVForwardIndexReader implements ForwardIndexReade
 
   @Override
   public boolean isSingleValue() {
-    return true;
+    return _isSingleValue;
   }
 
   @Override
@@ -173,8 +176,8 @@ public abstract class BaseChunkSVForwardIndexReader implements ForwardIndexReade
 
   @Override
   public void readValuesSV(int[] docIds, int length, int[] values, ChunkReaderContext context) {
-    if (getValueType().isFixedWidth() && !_isCompressed && isContiguousRange(docIds, length)) {
-      switch (getValueType()) {
+    if (_valueType.isFixedWidth() && !_isCompressed && isContiguousRange(docIds, length)) {
+      switch (_valueType) {
         case INT: {
           int minOffset = docIds[0] * Integer.BYTES;
           IntBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Integer.BYTES).asIntBuffer();
@@ -215,8 +218,8 @@ public abstract class BaseChunkSVForwardIndexReader implements ForwardIndexReade
 
   @Override
   public void readValuesSV(int[] docIds, int length, long[] values, ChunkReaderContext context) {
-    if (getValueType().isFixedWidth() && !_isCompressed && isContiguousRange(docIds, length)) {
-      switch (getValueType()) {
+    if (_valueType.isFixedWidth() && !_isCompressed && isContiguousRange(docIds, length)) {
+      switch (_valueType) {
         case INT: {
           int minOffset = docIds[0] * Integer.BYTES;
           IntBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Integer.BYTES).asIntBuffer();
@@ -257,8 +260,8 @@ public abstract class BaseChunkSVForwardIndexReader implements ForwardIndexReade
 
   @Override
   public void readValuesSV(int[] docIds, int length, float[] values, ChunkReaderContext context) {
-    if (getValueType().isFixedWidth() && !_isCompressed && isContiguousRange(docIds, length)) {
-      switch (getValueType()) {
+    if (_valueType.isFixedWidth() && !_isCompressed && isContiguousRange(docIds, length)) {
+      switch (_valueType) {
         case INT: {
           int minOffset = docIds[0] * Integer.BYTES;
           IntBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Integer.BYTES).asIntBuffer();
@@ -299,8 +302,8 @@ public abstract class BaseChunkSVForwardIndexReader implements ForwardIndexReade
 
   @Override
   public void readValuesSV(int[] docIds, int length, double[] values, ChunkReaderContext context) {
-    if (getValueType().isFixedWidth() && !_isCompressed && isContiguousRange(docIds, length)) {
-      switch (getValueType()) {
+    if (_valueType.isFixedWidth() && !_isCompressed && isContiguousRange(docIds, length)) {
+      switch (_valueType) {
         case INT: {
           int minOffset = docIds[0] * Integer.BYTES;
           IntBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Integer.BYTES).asIntBuffer();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitMVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitMVForwardIndexReader.java
index 784bcf2780..d77de108ff 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitMVForwardIndexReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitMVForwardIndexReader.java
@@ -132,6 +132,77 @@ public final class FixedBitMVForwardIndexReader implements ForwardIndexReader<Fi
     return numValues;
   }
 
+  @Override
+  public int[] getDictIdMV(int docId, Context context) {
+    int contextDocId = context._docId;
+    int contextEndOffset = context._endOffset;
+    int startIndex;
+    if (docId == contextDocId + 1) {
+      startIndex = contextEndOffset;
+    } else {
+      int chunkId = docId / _numDocsPerChunk;
+      if (docId > contextDocId && chunkId == contextDocId / _numDocsPerChunk) {
+        // Same chunk
+        startIndex = _bitmapReader.getNextNthSetBitOffset(contextEndOffset + 1, docId - contextDocId - 1);
+      } else {
+        // Different chunk
+        int chunkOffset = _chunkOffsetReader.getInt(chunkId);
+        int indexInChunk = docId % _numDocsPerChunk;
+        if (indexInChunk == 0) {
+          startIndex = chunkOffset;
+        } else {
+          startIndex = _bitmapReader.getNextNthSetBitOffset(chunkOffset + 1, indexInChunk);
+        }
+      }
+    }
+    int endIndex;
+    if (docId == _numDocs - 1) {
+      endIndex = _numValues;
+    } else {
+      endIndex = _bitmapReader.getNextSetBitOffset(startIndex + 1);
+    }
+    int numValues = endIndex - startIndex;
+    int[] dictIdBuffer = new int[numValues];
+    _rawDataReader.readInt(startIndex, numValues, dictIdBuffer);
+
+    // Update context
+    context._docId = docId;
+    context._endOffset = endIndex;
+
+    return dictIdBuffer;
+  }
+
+  public int getNumValuesMV(int docId, Context context) {
+    int contextDocId = context._docId;
+    int contextEndOffset = context._endOffset;
+    int startIndex;
+    if (docId == contextDocId + 1) {
+      startIndex = contextEndOffset;
+    } else {
+      int chunkId = docId / _numDocsPerChunk;
+      if (docId > contextDocId && chunkId == contextDocId / _numDocsPerChunk) {
+        // Same chunk
+        startIndex = _bitmapReader.getNextNthSetBitOffset(contextEndOffset + 1, docId - contextDocId - 1);
+      } else {
+        // Different chunk
+        int chunkOffset = _chunkOffsetReader.getInt(chunkId);
+        int indexInChunk = docId % _numDocsPerChunk;
+        if (indexInChunk == 0) {
+          startIndex = chunkOffset;
+        } else {
+          startIndex = _bitmapReader.getNextNthSetBitOffset(chunkOffset + 1, indexInChunk);
+        }
+      }
+    }
+    int endIndex;
+    if (docId == _numDocs - 1) {
+      endIndex = _numValues;
+    } else {
+      endIndex = _bitmapReader.getNextSetBitOffset(startIndex + 1);
+    }
+    return endIndex - startIndex;
+  }
+
   @Override
   public void close() {
     // NOTE: DO NOT close the PinotDataBuffer here because it is tracked by the caller and might be reused later. The
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java
index b2e745d11d..acbec1a98d 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java
@@ -30,14 +30,14 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
  * fixed length data type (INT, LONG, FLOAT, DOUBLE).
  * <p>For data layout, please refer to the documentation for {@link VarByteChunkSVForwardIndexWriter}
  */
-public final class FixedByteChunkMVForwardIndexReader extends BaseChunkSVForwardIndexReader {
+public final class FixedByteChunkMVForwardIndexReader extends BaseChunkForwardIndexReader {
 
   private static final int ROW_OFFSET_SIZE = VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
 
   private final int _maxChunkSize;
 
-  public FixedByteChunkMVForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType) {
-    super(dataBuffer, valueType);
+  public FixedByteChunkMVForwardIndexReader(PinotDataBuffer dataBuffer, DataType storedType) {
+    super(dataBuffer, storedType, false);
     _maxChunkSize = _numDocsPerChunk * (ROW_OFFSET_SIZE + _lengthOfLongestEntry);
   }
 
@@ -61,6 +61,17 @@ public final class FixedByteChunkMVForwardIndexReader extends BaseChunkSVForward
     return numValues;
   }
 
+  @Override
+  public int[] getIntMV(int docId, ChunkReaderContext context) {
+    ByteBuffer byteBuffer = slice(docId, context);
+    int numValues = byteBuffer.getInt();
+    int[] valueBuffer = new int[numValues];
+    for (int i = 0; i < numValues; i++) {
+      valueBuffer[i] = byteBuffer.getInt();
+    }
+    return valueBuffer;
+  }
+
   @Override
   public int getLongMV(int docId, long[] valueBuffer, ChunkReaderContext context) {
     ByteBuffer byteBuffer = slice(docId, context);
@@ -71,6 +82,17 @@ public final class FixedByteChunkMVForwardIndexReader extends BaseChunkSVForward
     return numValues;
   }
 
+  @Override
+  public long[] getLongMV(int docId, ChunkReaderContext context) {
+    ByteBuffer byteBuffer = slice(docId, context);
+    int numValues = byteBuffer.getInt();
+    long[] valueBuffer = new long[numValues];
+    for (int i = 0; i < numValues; i++) {
+      valueBuffer[i] = byteBuffer.getLong();
+    }
+    return valueBuffer;
+  }
+
   @Override
   public int getFloatMV(int docId, float[] valueBuffer, ChunkReaderContext context) {
     ByteBuffer byteBuffer = slice(docId, context);
@@ -81,6 +103,17 @@ public final class FixedByteChunkMVForwardIndexReader extends BaseChunkSVForward
     return numValues;
   }
 
+  @Override
+  public float[] getFloatMV(int docId, ChunkReaderContext context) {
+    ByteBuffer byteBuffer = slice(docId, context);
+    int numValues = byteBuffer.getInt();
+    float[] valueBuffer = new float[numValues];
+    for (int i = 0; i < numValues; i++) {
+      valueBuffer[i] = byteBuffer.getFloat();
+    }
+    return valueBuffer;
+  }
+
   @Override
   public int getDoubleMV(int docId, double[] valueBuffer, ChunkReaderContext context) {
     ByteBuffer byteBuffer = slice(docId, context);
@@ -91,6 +124,23 @@ public final class FixedByteChunkMVForwardIndexReader extends BaseChunkSVForward
     return numValues;
   }
 
+  @Override
+  public double[] getDoubleMV(int docId, ChunkReaderContext context) {
+    ByteBuffer byteBuffer = slice(docId, context);
+    int numValues = byteBuffer.getInt();
+    double[] valueBuffer = new double[numValues];
+    for (int i = 0; i < numValues; i++) {
+      valueBuffer[i] = byteBuffer.getDouble();
+    }
+    return valueBuffer;
+  }
+
+  @Override
+  public int getNumValuesMV(int docId, ChunkReaderContext context) {
+    ByteBuffer byteBuffer = slice(docId, context);
+    return byteBuffer.getInt();
+  }
+
   private ByteBuffer slice(int docId, ChunkReaderContext context) {
     if (_isCompressed) {
       return sliceBytesCompressed(docId, context);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkSVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkSVForwardIndexReader.java
index 94d8a9a8a6..c394d0021a 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkSVForwardIndexReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkSVForwardIndexReader.java
@@ -30,11 +30,11 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
  * LONG, FLOAT, DOUBLE).
  * <p>For data layout, please refer to the documentation for {@link FixedByteChunkSVForwardIndexWriter}
  */
-public final class FixedByteChunkSVForwardIndexReader extends BaseChunkSVForwardIndexReader {
+public final class FixedByteChunkSVForwardIndexReader extends BaseChunkForwardIndexReader {
   private final int _chunkSize;
 
   public FixedByteChunkSVForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType) {
-    super(dataBuffer, valueType);
+    super(dataBuffer, valueType, true);
     _chunkSize = _numDocsPerChunk * _lengthOfLongestEntry;
   }
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBytePower2ChunkSVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBytePower2ChunkSVForwardIndexReader.java
index 0effd62c7d..0887442804 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBytePower2ChunkSVForwardIndexReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBytePower2ChunkSVForwardIndexReader.java
@@ -30,13 +30,13 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
  * LONG, FLOAT, DOUBLE).
  * <p>For data layout, please refer to the documentation for {@link FixedByteChunkSVForwardIndexWriter}
  */
-public final class FixedBytePower2ChunkSVForwardIndexReader extends BaseChunkSVForwardIndexReader {
+public final class FixedBytePower2ChunkSVForwardIndexReader extends BaseChunkForwardIndexReader {
   public static final int VERSION = 4;
 
   private final int _shift;
 
   public FixedBytePower2ChunkSVForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType) {
-    super(dataBuffer, valueType);
+    super(dataBuffer, valueType, true);
     _shift = Integer.numberOfTrailingZeros(_numDocsPerChunk);
   }
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java
index 1844957f74..c445312f6f 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java
@@ -32,14 +32,14 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
  * (STRING, BYTES).
  * <p>For data layout, please refer to the documentation for {@link VarByteChunkSVForwardIndexWriter}
  */
-public final class VarByteChunkMVForwardIndexReader extends BaseChunkSVForwardIndexReader {
+public final class VarByteChunkMVForwardIndexReader extends BaseChunkForwardIndexReader {
 
   private static final int ROW_OFFSET_SIZE = VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
 
   private final int _maxChunkSize;
 
   public VarByteChunkMVForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType) {
-    super(dataBuffer, valueType);
+    super(dataBuffer, valueType, false);
     _maxChunkSize = _numDocsPerChunk * (ROW_OFFSET_SIZE + _lengthOfLongestEntry);
   }
 
@@ -76,6 +76,29 @@ public final class VarByteChunkMVForwardIndexReader extends BaseChunkSVForwardIn
     return numValues;
   }
 
+  @Override
+  public String[] getStringMV(final int docId, final ChunkReaderContext context) {
+    byte[] compressedBytes;
+    if (_isCompressed) {
+      compressedBytes = getBytesCompressed(docId, context);
+    } else {
+      compressedBytes = getBytesUncompressed(docId);
+    }
+    ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes);
+    int numValues = byteBuffer.getInt();
+    int contentOffset = (numValues + 1) * Integer.BYTES;
+    String[] valueBuffer = new String[numValues];
+    for (int i = 0; i < numValues; i++) {
+      int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
+      byte[] bytes = new byte[length];
+      byteBuffer.position(contentOffset);
+      byteBuffer.get(bytes, 0, length);
+      valueBuffer[i] = new String(bytes, StandardCharsets.UTF_8);
+      contentOffset += length;
+    }
+    return valueBuffer;
+  }
+
   @Override
   public int getBytesMV(final int docId, final byte[][] valueBuffer,
       final ChunkReaderContext context) {
@@ -99,6 +122,41 @@ public final class VarByteChunkMVForwardIndexReader extends BaseChunkSVForwardIn
     return numValues;
   }
 
+  @Override
+  public byte[][] getBytesMV(final int docId, final ChunkReaderContext context) {
+    byte[] compressedBytes;
+    if (_isCompressed) {
+      compressedBytes = getBytesCompressed(docId, context);
+    } else {
+      compressedBytes = getBytesUncompressed(docId);
+    }
+    ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes);
+    int numValues = byteBuffer.getInt();
+    int contentOffset = (numValues + 1) * Integer.BYTES;
+    byte[][] valueBuffer = new byte[numValues][];
+    for (int i = 0; i < numValues; i++) {
+      int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
+      byte[] bytes = new byte[length];
+      byteBuffer.position(contentOffset);
+      byteBuffer.get(bytes, 0, length);
+      valueBuffer[i] = bytes;
+      contentOffset += length;
+    }
+    return valueBuffer;
+  }
+
+  @Override
+  public int getNumValuesMV(final int docId, final ChunkReaderContext context) {
+    byte[] compressedBytes;
+    if (_isCompressed) {
+      compressedBytes = getBytesCompressed(docId, context);
+    } else {
+      compressedBytes = getBytesUncompressed(docId);
+    }
+    ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes);
+    return byteBuffer.getInt();
+  }
+
   @Override
   public byte[] getBytes(int docId, ChunkReaderContext context) {
     if (_isCompressed) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
index c993855ded..4a69f73463 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
@@ -34,7 +34,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
  * (BIG_DECIMAL, STRING, BYTES).
  * <p>For data layout, please refer to the documentation for {@link VarByteChunkSVForwardIndexWriter}
  */
-public final class VarByteChunkSVForwardIndexReader extends BaseChunkSVForwardIndexReader {
+public final class VarByteChunkSVForwardIndexReader extends BaseChunkForwardIndexReader {
   private static final int ROW_OFFSET_SIZE = VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
 
   private final int _maxChunkSize;
@@ -43,7 +43,7 @@ public final class VarByteChunkSVForwardIndexReader extends BaseChunkSVForwardIn
   private final ThreadLocal<byte[]> _reusableBytes = ThreadLocal.withInitial(() -> new byte[_lengthOfLongestEntry]);
 
   public VarByteChunkSVForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType) {
-    super(dataBuffer, valueType);
+    super(dataBuffer, valueType, true);
     _maxChunkSize = _numDocsPerChunk * (ROW_OFFSET_SIZE + _lengthOfLongestEntry);
   }
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java
index b3c5be27b9..c966e8318a 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java
@@ -25,7 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
-import org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader;
+import org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkForwardIndexReader;
 import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2;
 import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
 import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
@@ -98,7 +98,7 @@ public class StarTreeLoaderUtils {
         PinotDataBuffer forwardIndexDataBuffer = dataBuffer.view(start, end, ByteOrder.BIG_ENDIAN);
         DataType dataType = ValueAggregatorFactory.getAggregatedValueType(functionColumnPair.getFunctionType());
         FieldSpec fieldSpec = new MetricFieldSpec(metric, dataType);
-        BaseChunkSVForwardIndexReader forwardIndex;
+        BaseChunkForwardIndexReader forwardIndex;
         if (dataType == DataType.BYTES) {
           forwardIndex = new VarByteChunkSVForwardIndexReader(forwardIndexDataBuffer, DataType.BYTES);
         } else {
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplRawMVTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplRawMVTest.java
new file mode 100644
index 0000000000..51e9e094d9
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplRawMVTest.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.indexsegment.mutable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderFactory;
+import org.apache.pinot.spi.stream.StreamMessageMetadata;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
+
+public class MutableSegmentImplRawMVTest {
+  private static final String AVRO_FILE = "data/test_data-mv.avro";
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "MutableSegmentImplRawMVTest");
+
+  private Schema _schema;
+  private MutableSegmentImpl _mutableSegmentImpl;
+  private ImmutableSegment _immutableSegment;
+  private long _lastIndexedTs;
+  private long _lastIngestionTimeMs;
+  private long _startTimeMs;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteQuietly(TEMP_DIR);
+
+    URL resourceUrl = MutableSegmentImplTest.class.getClassLoader().getResource(AVRO_FILE);
+    Assert.assertNotNull(resourceUrl);
+    File avroFile = new File(resourceUrl.getFile());
+
+    SegmentGeneratorConfig config =
+        SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(avroFile, TEMP_DIR, "testTable");
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+
+    _schema = config.getSchema();
+    Set<String> noDictionaryColumns = new HashSet<>();
+    List<String> noDictionaryColumnsList = new ArrayList<>();
+    for (FieldSpec fieldSpec : _schema.getAllFieldSpecs()) {
+      if (!fieldSpec.isSingleValueField() && fieldSpec.getDataType().isFixedWidth()) {
+        noDictionaryColumns.add(fieldSpec.getName());
+        noDictionaryColumnsList.add(fieldSpec.getName());
+      }
+    }
+    config.setRawIndexCreationColumns(noDictionaryColumnsList);
+    assertEquals(noDictionaryColumns.size(), 2);
+
+    driver.init(config);
+    driver.build();
+    _immutableSegment = ImmutableSegmentLoader.load(new File(TEMP_DIR, driver.getSegmentName()), ReadMode.mmap);
+
+    VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSegmentSchema(_schema, "testSegment");
+    _mutableSegmentImpl = MutableSegmentImplTestUtils
+        .createMutableSegmentImpl(_schema, noDictionaryColumns, Collections.emptySet(), Collections.emptySet(),
+            false);
+    _lastIngestionTimeMs = System.currentTimeMillis();
+    StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(_lastIngestionTimeMs);
+    _startTimeMs = System.currentTimeMillis();
+
+    try (RecordReader recordReader = RecordReaderFactory
+        .getRecordReader(FileFormat.AVRO, avroFile, _schema.getColumnNames(), null)) {
+      GenericRow reuse = new GenericRow();
+      while (recordReader.hasNext()) {
+        _mutableSegmentImpl.index(recordReader.next(reuse), defaultMetadata);
+        _lastIndexedTs = System.currentTimeMillis();
+      }
+    }
+  }
+
+  @Test
+  public void testMetadata() {
+    SegmentMetadata actualSegmentMetadata = _mutableSegmentImpl.getSegmentMetadata();
+    SegmentMetadata expectedSegmentMetadata = _immutableSegment.getSegmentMetadata();
+    assertEquals(actualSegmentMetadata.getTotalDocs(), expectedSegmentMetadata.getTotalDocs());
+
+    // assert that the last indexed timestamp is close to what we expect
+    long actualTs = _mutableSegmentImpl.getSegmentMetadata().getLastIndexedTimestamp();
+    Assert.assertTrue(actualTs >= _startTimeMs);
+    Assert.assertTrue(actualTs <= _lastIndexedTs);
+
+    assertEquals(_mutableSegmentImpl.getSegmentMetadata().getLatestIngestionTimestamp(), _lastIngestionTimeMs);
+
+    for (FieldSpec fieldSpec : _schema.getAllFieldSpecs()) {
+      String column = fieldSpec.getName();
+      DataSourceMetadata actualDataSourceMetadata = _mutableSegmentImpl.getDataSource(column).getDataSourceMetadata();
+      DataSourceMetadata expectedDataSourceMetadata = _immutableSegment.getDataSource(column).getDataSourceMetadata();
+      assertEquals(actualDataSourceMetadata.getDataType(), expectedDataSourceMetadata.getDataType());
+      assertEquals(actualDataSourceMetadata.isSingleValue(), expectedDataSourceMetadata.isSingleValue());
+      assertEquals(actualDataSourceMetadata.getNumDocs(), expectedDataSourceMetadata.getNumDocs());
+      if (!expectedDataSourceMetadata.isSingleValue()) {
+        assertEquals(actualDataSourceMetadata.getMaxNumValuesPerMVEntry(),
+            expectedDataSourceMetadata.getMaxNumValuesPerMVEntry());
+      }
+    }
+  }
+
+  @Test
+  public void testDataSourceForSVColumns()
+      throws IOException {
+    for (FieldSpec fieldSpec : _schema.getAllFieldSpecs()) {
+      if (fieldSpec.isSingleValueField()) {
+        String column = fieldSpec.getName();
+        DataSource actualDataSource = _mutableSegmentImpl.getDataSource(column);
+        DataSource expectedDataSource = _immutableSegment.getDataSource(column);
+
+        int actualNumDocs = actualDataSource.getDataSourceMetadata().getNumDocs();
+        int expectedNumDocs = expectedDataSource.getDataSourceMetadata().getNumDocs();
+        assertEquals(actualNumDocs, expectedNumDocs);
+
+        Dictionary actualDictionary = actualDataSource.getDictionary();
+        Dictionary expectedDictionary = expectedDataSource.getDictionary();
+        assertEquals(actualDictionary.length(), expectedDictionary.length());
+
+        // Allow the segment name to be different
+        if (column.equals(CommonConstants.Segment.BuiltInVirtualColumn.SEGMENTNAME)) {
+          continue;
+        }
+
+        ForwardIndexReader actualReader = actualDataSource.getForwardIndex();
+        ForwardIndexReader expectedReader = expectedDataSource.getForwardIndex();
+        try (ForwardIndexReaderContext actualReaderContext = actualReader.createContext();
+            ForwardIndexReaderContext expectedReaderContext = expectedReader.createContext()) {
+          for (int docId = 0; docId < expectedNumDocs; docId++) {
+            int actualDictId = actualReader.getDictId(docId, actualReaderContext);
+            int expectedDictId = expectedReader.getDictId(docId, expectedReaderContext);
+            assertEquals(actualDictionary.get(actualDictId), expectedDictionary.get(expectedDictId));
+          }
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testDataSourceForMVColumns()
+      throws IOException {
+    for (FieldSpec fieldSpec : _schema.getAllFieldSpecs()) {
+      if (!fieldSpec.isSingleValueField()) {
+        String column = fieldSpec.getName();
+        DataSource actualDataSource = _mutableSegmentImpl.getDataSource(column);
+        DataSource expectedDataSource = _immutableSegment.getDataSource(column);
+
+        int actualNumDocs = actualDataSource.getDataSourceMetadata().getNumDocs();
+        int expectedNumDocs = expectedDataSource.getDataSourceMetadata().getNumDocs();
+        assertEquals(actualNumDocs, expectedNumDocs);
+
+        Dictionary actualDictionary = actualDataSource.getDictionary();
+        Dictionary expectedDictionary = expectedDataSource.getDictionary();
+        assertNull(actualDictionary);
+        assertNull(expectedDictionary);
+
+        int maxNumValuesPerMVEntry = expectedDataSource.getDataSourceMetadata().getMaxNumValuesPerMVEntry();
+
+        ForwardIndexReader actualReader = actualDataSource.getForwardIndex();
+        ForwardIndexReader expectedReader = expectedDataSource.getForwardIndex();
+        try (ForwardIndexReaderContext actualReaderContext = actualReader.createContext();
+            ForwardIndexReaderContext expectedReaderContext = expectedReader.createContext()) {
+          for (int docId = 0; docId < expectedNumDocs; docId++) {
+            switch (fieldSpec.getDataType()) {
+              case INT:
+                int[] actualInts = new int[maxNumValuesPerMVEntry];
+                int[] expectedInts = new int[maxNumValuesPerMVEntry];
+                int actualLength = actualReader.getIntMV(docId, actualInts, actualReaderContext);
+                int expectedLength = expectedReader.getIntMV(docId, expectedInts, expectedReaderContext);
+                assertEquals(actualLength, expectedLength);
+
+                for (int i = 0; i < actualLength; i++) {
+                  assertEquals(actualInts[i], expectedInts[i]);
+                }
+                break;
+              case LONG:
+                long[] actualLongs = new long[maxNumValuesPerMVEntry];
+                long[] expectedLongs = new long[maxNumValuesPerMVEntry];
+                actualLength = actualReader.getLongMV(docId, actualLongs, actualReaderContext);
+                expectedLength = expectedReader.getLongMV(docId, expectedLongs, expectedReaderContext);
+                assertEquals(actualLength, expectedLength);
+
+                for (int i = 0; i < actualLength; i++) {
+                  assertEquals(actualLongs[i], expectedLongs[i]);
+                }
+                break;
+              case FLOAT:
+                float[] actualFloats = new float[maxNumValuesPerMVEntry];
+                float[] expectedFloats = new float[maxNumValuesPerMVEntry];
+                actualLength = actualReader.getFloatMV(docId, actualFloats, actualReaderContext);
+                expectedLength = expectedReader.getFloatMV(docId, expectedFloats, expectedReaderContext);
+                assertEquals(actualLength, expectedLength);
+
+                for (int i = 0; i < actualLength; i++) {
+                  assertEquals(actualFloats[i], expectedFloats[i]);
+                }
+                break;
+              case DOUBLE:
+                double[] actualDoubles = new double[maxNumValuesPerMVEntry];
+                double[] expectedDoubles = new double[maxNumValuesPerMVEntry];
+                actualLength = actualReader.getDoubleMV(docId, actualDoubles, actualReaderContext);
+                expectedLength = expectedReader.getDoubleMV(docId, expectedDoubles, expectedReaderContext);
+                assertEquals(actualLength, expectedLength);
+
+                for (int i = 0; i < actualLength; i++) {
+                  assertEquals(actualDoubles[i], expectedDoubles[i]);
+                }
+                break;
+              default:
+                // TODO: add support for byte, string, and big decimal type MV raw columns
+                throw new UnsupportedOperationException("No support for raw MV variable length columns yet");
+            }
+          }
+        }
+      }
+    }
+  }
+
+  @AfterClass
+  public void tearDown() {
+    FileUtils.deleteQuietly(TEMP_DIR);
+  }
+}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/dictionary/MultiValueDictionaryTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/dictionary/MultiValueDictionaryTest.java
index 953b48ef60..0dcf5492d4 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/dictionary/MultiValueDictionaryTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/dictionary/MultiValueDictionaryTest.java
@@ -22,11 +22,13 @@ import java.util.Random;
 import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
 import org.apache.pinot.segment.local.realtime.impl.forward.FixedByteMVMutableForwardIndex;
 import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.fail;
 
 
@@ -47,11 +49,12 @@ public class MultiValueDictionaryTest {
   }
 
   @Test
-  public void testMultiValueIndexing() {
+  public void testMultiValueIndexingWithDictionary() {
     long seed = System.nanoTime();
     try (LongOnHeapMutableDictionary dict = new LongOnHeapMutableDictionary();
         FixedByteMVMutableForwardIndex indexer = new FixedByteMVMutableForwardIndex(MAX_N_VALUES, MAX_N_VALUES / 2,
-            NROWS / 3, Integer.BYTES, new DirectMemoryManager("test"), "indexer")) {
+            NROWS / 3, Integer.BYTES, new DirectMemoryManager("test"), "indexer",
+            true, FieldSpec.DataType.INT)) {
       // Insert rows into the indexer and dictionary
       Random random = new Random(seed);
       for (int row = 0; row < NROWS; row++) {
@@ -81,4 +84,166 @@ public class MultiValueDictionaryTest {
       fail("Failed with random seed: " + seed, t);
     }
   }
+
+  @Test
+  public void testMultiValueIndexingWithRawInt() {
+    long seed = System.nanoTime();
+    try (FixedByteMVMutableForwardIndex indexer = new FixedByteMVMutableForwardIndex(MAX_N_VALUES, MAX_N_VALUES / 2,
+            NROWS / 3, Integer.BYTES, new DirectMemoryManager("test"), "indexer",
+            false, FieldSpec.DataType.INT)) {
+      // Insert rows into the indexer
+      Random random = new Random(seed);
+      for (int row = 0; row < NROWS; row++) {
+        int numValues = Math.abs(random.nextInt()) % MAX_N_VALUES;
+        int[] values = new int[numValues];
+        for (int i = 0; i < numValues; i++) {
+          values[i] = random.nextInt();
+        }
+        indexer.setIntMV(row, values);
+      }
+
+      // Read back rows and make sure that the values are good.
+      random = new Random(seed);
+      int[] intValues = new int[MAX_N_VALUES];
+      for (int row = 0; row < NROWS; row++) {
+        int numValues = indexer.getIntMV(row, intValues);
+        assertEquals(numValues, Math.abs(random.nextInt()) % MAX_N_VALUES);
+
+        for (int i = 0; i < numValues; i++) {
+          assertEquals(intValues[i], random.nextInt());
+        }
+      }
+    } catch (Throwable t) {
+      fail("Failed with random seed: " + seed, t);
+    }
+  }
+
+  @Test
+  public void testMultiValueIndexingWithRawLong() {
+    long seed = System.nanoTime();
+    try (FixedByteMVMutableForwardIndex indexer = new FixedByteMVMutableForwardIndex(MAX_N_VALUES, MAX_N_VALUES / 2,
+        NROWS / 3, Long.BYTES, new DirectMemoryManager("test"), "indexer",
+        false, FieldSpec.DataType.LONG)) {
+      // Insert rows into the indexer
+      Random random = new Random(seed);
+      for (int row = 0; row < NROWS; row++) {
+        int numValues = Math.abs(random.nextInt()) % MAX_N_VALUES;
+        long[] values = new long[numValues];
+        for (int i = 0; i < numValues; i++) {
+          values[i] = random.nextLong();
+        }
+        indexer.setLongMV(row, values);
+      }
+
+      // Read back rows and make sure that the values are good.
+      random = new Random(seed);
+      long[] longValues = new long[MAX_N_VALUES];
+      for (int row = 0; row < NROWS; row++) {
+        int numValues = indexer.getLongMV(row, longValues);
+        assertEquals(numValues, Math.abs(random.nextInt()) % MAX_N_VALUES);
+
+        for (int i = 0; i < numValues; i++) {
+          assertEquals(longValues[i], random.nextLong());
+        }
+      }
+    } catch (Throwable t) {
+      fail("Failed with random seed: " + seed, t);
+    }
+  }
+
+  @Test
+  public void testMultiValueIndexingWithRawFloat() {
+    long seed = System.nanoTime();
+    try (FixedByteMVMutableForwardIndex indexer = new FixedByteMVMutableForwardIndex(MAX_N_VALUES, MAX_N_VALUES / 2,
+        NROWS / 3, Float.BYTES, new DirectMemoryManager("test"), "indexer",
+        false, FieldSpec.DataType.FLOAT)) {
+      // Insert rows into the indexer
+      Random random = new Random(seed);
+      for (int row = 0; row < NROWS; row++) {
+        int numValues = Math.abs(random.nextInt()) % MAX_N_VALUES;
+        float[] values = new float[numValues];
+        for (int i = 0; i < numValues; i++) {
+          values[i] = random.nextFloat();
+        }
+        indexer.setFloatMV(row, values);
+      }
+
+      // Read back rows and make sure that the values are good.
+      random = new Random(seed);
+      float[] floatValues = new float[MAX_N_VALUES];
+      for (int row = 0; row < NROWS; row++) {
+        int numValues = indexer.getFloatMV(row, floatValues);
+        assertEquals(numValues, Math.abs(random.nextInt()) % MAX_N_VALUES);
+
+        for (int i = 0; i < numValues; i++) {
+          assertEquals(floatValues[i], random.nextFloat());
+        }
+      }
+    } catch (Throwable t) {
+      fail("Failed with random seed: " + seed, t);
+    }
+  }
+
+  @Test
+  public void testMultiValueIndexingWithRawDouble() {
+    long seed = System.nanoTime();
+    try (FixedByteMVMutableForwardIndex indexer = new FixedByteMVMutableForwardIndex(MAX_N_VALUES, MAX_N_VALUES / 2,
+        NROWS / 3, Double.BYTES, new DirectMemoryManager("test"), "indexer",
+        false, FieldSpec.DataType.DOUBLE)) {
+      // Insert rows into the indexer
+      Random random = new Random(seed);
+      for (int row = 0; row < NROWS; row++) {
+        int numValues = Math.abs(random.nextInt()) % MAX_N_VALUES;
+        double[] values = new double[numValues];
+        for (int i = 0; i < numValues; i++) {
+          values[i] = random.nextDouble();
+        }
+        indexer.setDoubleMV(row, values);
+      }
+
+      // Read back rows and make sure that the values are good.
+      random = new Random(seed);
+      double[] doubleValues = new double[MAX_N_VALUES];
+      for (int row = 0; row < NROWS; row++) {
+        int numValues = indexer.getDoubleMV(row, doubleValues);
+        assertEquals(numValues, Math.abs(random.nextInt()) % MAX_N_VALUES);
+
+        for (int i = 0; i < numValues; i++) {
+          assertEquals(doubleValues[i], random.nextDouble());
+        }
+      }
+    } catch (Throwable t) {
+      fail("Failed with random seed: " + seed, t);
+    }
+  }
+
+  @Test
+  public void testMultiValueIndexingWithRawString() {
+    long seed = System.nanoTime();
+    try (FixedByteMVMutableForwardIndex indexer = new FixedByteMVMutableForwardIndex(MAX_N_VALUES, MAX_N_VALUES / 2,
+        NROWS / 3, 24, new DirectMemoryManager("test"), "indexer",
+        false, FieldSpec.DataType.STRING)) {
+      // Insert rows into the indexer
+      Random random = new Random(seed);
+      for (int row = 0; row < NROWS; row++) {
+        int numValues = Math.abs(random.nextInt()) % MAX_N_VALUES;
+        String[] values = new String[numValues];
+        for (int i = 0; i < numValues; i++) {
+          values[i] = "random1";
+        }
+        final int curRow = row;
+        assertThrows(UnsupportedOperationException.class, () -> indexer.setStringMV(curRow, values));
+      }
+
+      // Read back rows and make sure that the values are good.
+      random = new Random(seed);
+      String[] stringValues = new String[MAX_N_VALUES];
+      for (int row = 0; row < NROWS; row++) {
+        final int curRow = row;
+        assertThrows(UnsupportedOperationException.class, () -> indexer.getStringMV(curRow, stringValues));
+      }
+    } catch (Throwable t) {
+      fail("Failed with random seed: " + seed, t);
+    }
+  }
 }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
index fb49dce6c4..881aa495a8 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
@@ -125,7 +125,8 @@ public class MultiValueFixedByteRawIndexCreatorTest {
     //read
     final PinotDataBuffer buffer = PinotDataBuffer
         .mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, "");
-    FixedByteChunkMVForwardIndexReader reader = new FixedByteChunkMVForwardIndexReader(buffer, DataType.BYTES);
+    FixedByteChunkMVForwardIndexReader reader = new FixedByteChunkMVForwardIndexReader(buffer,
+        dataType.getStoredType());
     final ChunkReaderContext context = reader.createContext();
     T valueBuffer = constructor.apply(maxElements);
     for (int i = 0; i < numDocs; i++) {
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteMVMutableForwardIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteMVMutableForwardIndexTest.java
index 53fded69be..3268277562 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteMVMutableForwardIndexTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteMVMutableForwardIndexTest.java
@@ -50,15 +50,18 @@ public class FixedByteMVMutableForwardIndexTest {
     Random r = new Random();
     final long seed = r.nextLong();
     try {
-      testIntArray(seed);
-      testWithZeroSize(seed);
+      testIntArray(seed, true);
+      testIntArray(seed, false);
+      testWithZeroSize(seed, true);
+      testWithZeroSize(seed, false);
     } catch (Throwable e) {
       e.printStackTrace();
       Assert.fail("Failed with seed " + seed);
     }
     for (int mvs = 10; mvs < 1000; mvs += 10) {
       try {
-        testIntArrayFixedSize(mvs, seed);
+        testIntArrayFixedSize(mvs, seed, true);
+        testIntArrayFixedSize(mvs, seed, false);
       } catch (Throwable e) {
         e.printStackTrace();
         Assert.fail("Failed with seed " + seed + ", mvs " + mvs);
@@ -66,7 +69,7 @@ public class FixedByteMVMutableForwardIndexTest {
     }
   }
 
-  public void testIntArray(final long seed)
+  public void testIntArray(final long seed, boolean isDictionaryEncoded)
       throws IOException {
     FixedByteMVMutableForwardIndex readerWriter;
     int rows = 1000;
@@ -74,7 +77,7 @@ public class FixedByteMVMutableForwardIndexTest {
     int maxNumberOfMultiValuesPerRow = 2000;
     readerWriter =
         new FixedByteMVMutableForwardIndex(maxNumberOfMultiValuesPerRow, 2, rows / 2, columnSizeInBytes, _memoryManager,
-            "IntArray");
+            "IntArray", isDictionaryEncoded, FieldSpec.DataType.INT);
 
     Random r = new Random(seed);
     int[][] data = new int[rows][];
@@ -94,7 +97,7 @@ public class FixedByteMVMutableForwardIndexTest {
     readerWriter.close();
   }
 
-  public void testIntArrayFixedSize(int multiValuesPerRow, long seed)
+  public void testIntArrayFixedSize(int multiValuesPerRow, long seed, boolean isDictionaryEncoded)
       throws IOException {
     FixedByteMVMutableForwardIndex readerWriter;
     int rows = 1000;
@@ -102,7 +105,7 @@ public class FixedByteMVMutableForwardIndexTest {
     // Keep the rowsPerChunk as a multiple of multiValuesPerRow to check the cases when both data and header buffers
     // transition to new ones
     readerWriter = new FixedByteMVMutableForwardIndex(multiValuesPerRow, multiValuesPerRow, multiValuesPerRow * 2,
-        columnSizeInBytes, _memoryManager, "IntArrayFixedSize");
+        columnSizeInBytes, _memoryManager, "IntArrayFixedSize", isDictionaryEncoded, FieldSpec.DataType.INT);
 
     Random r = new Random(seed);
     int[][] data = new int[rows][];
@@ -122,7 +125,7 @@ public class FixedByteMVMutableForwardIndexTest {
     readerWriter.close();
   }
 
-  public void testWithZeroSize(long seed)
+  public void testWithZeroSize(long seed, boolean isDictionaryEncoded)
       throws IOException {
     FixedByteMVMutableForwardIndex readerWriter;
     final int maxNumberOfMultiValuesPerRow = 5;
@@ -131,7 +134,7 @@ public class FixedByteMVMutableForwardIndexTest {
     Random r = new Random(seed);
     readerWriter =
         new FixedByteMVMutableForwardIndex(maxNumberOfMultiValuesPerRow, 3, r.nextInt(rows) + 1, columnSizeInBytes,
-            _memoryManager, "ZeroSize");
+            _memoryManager, "ZeroSize", isDictionaryEncoded, FieldSpec.DataType.INT);
 
     int[][] data = new int[rows][];
     for (int i = 0; i < rows; i++) {
@@ -156,12 +159,12 @@ public class FixedByteMVMutableForwardIndexTest {
   }
 
   private FixedByteMVMutableForwardIndex createReaderWriter(FieldSpec.DataType dataType, Random r, int rows,
-      int maxNumberOfMultiValuesPerRow) {
+      int maxNumberOfMultiValuesPerRow, boolean isDictionaryEncoded) {
     final int avgMultiValueCount = r.nextInt(maxNumberOfMultiValuesPerRow) + 1;
     final int rowCountPerChunk = r.nextInt(rows) + 1;
 
     return new FixedByteMVMutableForwardIndex(maxNumberOfMultiValuesPerRow, avgMultiValueCount, rowCountPerChunk,
-        dataType.size(), _memoryManager, "ReaderWriter");
+        dataType.size(), _memoryManager, "ReaderWriter", isDictionaryEncoded, dataType);
   }
 
   private long generateSeed() {
@@ -172,12 +175,18 @@ public class FixedByteMVMutableForwardIndexTest {
   @Test
   public void testLongArray()
       throws IOException {
+    testLongArray(true);
+    testLongArray(false);
+  }
+
+  private void testLongArray(boolean isDictionaryEncoded)
+      throws IOException {
     final long seed = generateSeed();
     Random r = new Random(seed);
     int rows = 1000;
     final int maxNumberOfMultiValuesPerRow = r.nextInt(100) + 1;
     FixedByteMVMutableForwardIndex readerWriter =
-        createReaderWriter(FieldSpec.DataType.LONG, r, rows, maxNumberOfMultiValuesPerRow);
+        createReaderWriter(FieldSpec.DataType.LONG, r, rows, maxNumberOfMultiValuesPerRow, isDictionaryEncoded);
 
     long[][] data = new long[rows][];
     for (int i = 0; i < rows; i++) {
@@ -204,12 +213,18 @@ public class FixedByteMVMutableForwardIndexTest {
   @Test
   public void testFloatArray()
       throws IOException {
+    testFloatArray(true);
+    testFloatArray(false);
+  }
+
+  private void testFloatArray(boolean isDictoinaryEncoded)
+      throws IOException {
     final long seed = generateSeed();
     Random r = new Random(seed);
     int rows = 1000;
     final int maxNumberOfMultiValuesPerRow = r.nextInt(100) + 1;
     FixedByteMVMutableForwardIndex readerWriter =
-        createReaderWriter(FieldSpec.DataType.FLOAT, r, rows, maxNumberOfMultiValuesPerRow);
+        createReaderWriter(FieldSpec.DataType.FLOAT, r, rows, maxNumberOfMultiValuesPerRow, isDictoinaryEncoded);
 
     float[][] data = new float[rows][];
     for (int i = 0; i < rows; i++) {
@@ -236,12 +251,18 @@ public class FixedByteMVMutableForwardIndexTest {
   @Test
   public void testDoubleArray()
       throws IOException {
+    testDoubleArray(true);
+    testDoubleArray(false);
+  }
+
+  private void testDoubleArray(boolean isDictonaryEncoded)
+      throws IOException {
     final long seed = generateSeed();
     Random r = new Random(seed);
     int rows = 1000;
     final int maxNumberOfMultiValuesPerRow = r.nextInt(100) + 1;
     FixedByteMVMutableForwardIndex readerWriter =
-        createReaderWriter(FieldSpec.DataType.DOUBLE, r, rows, maxNumberOfMultiValuesPerRow);
+        createReaderWriter(FieldSpec.DataType.DOUBLE, r, rows, maxNumberOfMultiValuesPerRow, isDictonaryEncoded);
 
     double[][] data = new double[rows][];
     for (int i = 0; i < rows; i++) {
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableForwardIndex.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableForwardIndex.java
index cc84074ea0..38e2bc8539 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableForwardIndex.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableForwardIndex.java
@@ -92,11 +92,26 @@ public interface MutableForwardIndex extends ForwardIndexReader<ForwardIndexRead
     throw new UnsupportedOperationException();
   }
 
+  /**
+   * Reads the dictionary ids for a multi-value column at the given document id into a buffer and returns the buffer.
+   *
+   * @param docId Document id
+   * @return A buffer containing the multi-value entries
+   */
+  default int[] getDictIdMV(int docId) {
+    throw new UnsupportedOperationException();
+  }
+
   @Override
   default int getDictIdMV(int docId, int[] dictIdBuffer, ForwardIndexReaderContext context) {
     return getDictIdMV(docId, dictIdBuffer);
   }
 
+  @Override
+  default int[] getDictIdMV(int docId, ForwardIndexReaderContext context) {
+    return getDictIdMV(docId);
+  }
+
   /**
    * Writes the dictionary id for a single-value column into the given document id.
    *
@@ -321,11 +336,26 @@ public interface MutableForwardIndex extends ForwardIndexReader<ForwardIndexRead
     throw new UnsupportedOperationException();
   }
 
+  /**
+   * Reads the INT type multi-value at the given document id into a buffer and returns the buffer.
+   *
+   * @param docId Document id
+   * @return A buffer containing the multi-value entries
+   */
+  default int[] getIntMV(int docId) {
+    throw new UnsupportedOperationException();
+  }
+
   @Override
   default int getIntMV(int docId, int[] valueBuffer, ForwardIndexReaderContext context) {
     return getIntMV(docId, valueBuffer);
   }
 
+  @Override
+  default int[] getIntMV(int docId, ForwardIndexReaderContext context) {
+    return getIntMV(docId);
+  }
+
   /**
    * Reads the LONG type multi-value at the given document id into the passed in value buffer (the buffer size must be
    * enough to hold all the values for the multi-value entry) and returns the number of values within the multi-value
@@ -339,11 +369,26 @@ public interface MutableForwardIndex extends ForwardIndexReader<ForwardIndexRead
     throw new UnsupportedOperationException();
   }
 
+  /**
+   * Reads the LONG type multi-value at the given document id into a buffer and returns the buffer.
+   *
+   * @param docId Document id
+   * @return A buffer containing the multi-value entries
+   */
+  default long[] getLongMV(int docId) {
+    throw new UnsupportedOperationException();
+  }
+
   @Override
   default int getLongMV(int docId, long[] valueBuffer, ForwardIndexReaderContext context) {
     return getLongMV(docId, valueBuffer);
   }
 
+  @Override
+  default long[] getLongMV(int docId, ForwardIndexReaderContext context) {
+    return getLongMV(docId);
+  }
+
   /**
    * Reads the FLOAT type multi-value at the given document id into the passed in value buffer (the buffer size must be
    * enough to hold all the values for the multi-value entry) and returns the number of values within the multi-value
@@ -357,11 +402,26 @@ public interface MutableForwardIndex extends ForwardIndexReader<ForwardIndexRead
     throw new UnsupportedOperationException();
   }
 
+  /**
+   * Reads the FLOAT type multi-value at the given document id into a buffer and returns the buffer.
+   *
+   * @param docId Document id
+   * @return A buffer containing the multi-value entries
+   */
+  default float[] getFloatMV(int docId) {
+    throw new UnsupportedOperationException();
+  }
+
   @Override
   default int getFloatMV(int docId, float[] valueBuffer, ForwardIndexReaderContext context) {
     return getFloatMV(docId, valueBuffer);
   }
 
+  @Override
+  default float[] getFloatMV(int docId, ForwardIndexReaderContext context) {
+    return getFloatMV(docId);
+  }
+
   /**
    * Reads the DOUBLE type multi-value at the given document id into the passed in value buffer (the buffer size must
    * be enough to hold all the values for the multi-value entry) and returns the number of values within the multi-value
@@ -375,11 +435,26 @@ public interface MutableForwardIndex extends ForwardIndexReader<ForwardIndexRead
     throw new UnsupportedOperationException();
   }
 
+  /**
+   * Reads the DOUBLE type multi-value at the given document id into a buffer and returns the buffer.
+   *
+   * @param docId Document id
+   * @return A buffer containing the multi-value entries
+   */
+  default double[] getDoubleMV(int docId) {
+    throw new UnsupportedOperationException();
+  }
+
   @Override
   default int getDoubleMV(int docId, double[] valueBuffer, ForwardIndexReaderContext context) {
     return getDoubleMV(docId, valueBuffer);
   }
 
+  @Override
+  default double[] getDoubleMV(int docId, ForwardIndexReaderContext context) {
+    return getDoubleMV(docId);
+  }
+
   /**
    * Reads the STRING type multi-value at the given document id into the passed in value buffer (the buffer size must
    * be enough to hold all the values for the multi-value entry) and returns the number of values within the multi-value
@@ -393,11 +468,41 @@ public interface MutableForwardIndex extends ForwardIndexReader<ForwardIndexRead
     throw new UnsupportedOperationException();
   }
 
+  /**
+   * Reads the STRING type multi-value at the given document id into a buffer and returns the buffer.
+   *
+   * @param docId Document id
+   * @return A buffer containing the multi-value entries
+   */
+  default String[] getStringMV(int docId) {
+    throw new UnsupportedOperationException();
+  }
+
   @Override
   default int getStringMV(int docId, String[] valueBuffer, ForwardIndexReaderContext context) {
     return getStringMV(docId, valueBuffer);
   }
 
+  @Override
+  default String[] getStringMV(int docId, ForwardIndexReaderContext context) {
+    return getStringMV(docId);
+  }
+
+  /**
+   * Gets the number of multi-values at a given document id and returns it.
+   *
+   * @param docId Document id
+   * @return Number of values within the multi-value entry
+   */
+  default int getNumValuesMV(int docId) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  default int getNumValuesMV(int docId, ForwardIndexReaderContext context) {
+    return getNumValuesMV(docId);
+  }
+
   /**
    * Writes the INT type multi-value into the given document id.
    *
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
index 62e34dfc55..8db16ac496 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
@@ -100,6 +100,17 @@ public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends
     throw new UnsupportedOperationException();
   }
 
+  /**
+   * Reads the dictionary ids for a multi-value column at the given document id.
+   *
+   * @param docId Document id
+   * @param context Reader context
+   * @return Dictionary ids at the given document id
+   */
+  default int[] getDictIdMV(int docId, T context) {
+    throw new UnsupportedOperationException();
+  }
+
   /**
    * SINGLE-VALUE COLUMN RAW INDEX APIs
    */
@@ -410,9 +421,308 @@ public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends
 
   /**
    * MULTI-VALUE COLUMN RAW INDEX APIs
-   * TODO: Not supported yet
    */
 
+  /**
+   * Fills the values
+   * @param docIds Array containing the document ids to read
+   * @param length Number of values to read
+   * @param maxNumValuesPerMVEntry Maximum number of values per MV entry
+   * @param values Values to fill
+   * @param context Reader context
+   */
+  default void readValuesMV(int[] docIds, int length, int maxNumValuesPerMVEntry, int[][] values, T context) {
+    switch (getValueType()) {
+      case INT:
+        for (int i = 0; i < length; i++) {
+          values[i] = getIntMV(docIds[i], context);
+        }
+        break;
+      case LONG:
+        long[] longValueBuffer = new long[maxNumValuesPerMVEntry];
+        for (int i = 0; i < length; i++) {
+          int numValues = getLongMV(docIds[i], longValueBuffer, context);
+          values[i] = new int[numValues];
+          for (int j = 0; j < numValues; j++) {
+            values[i][j] = (int) longValueBuffer[j];
+          }
+        }
+        break;
+      case FLOAT:
+        float[] floatValueBuffer = new float[maxNumValuesPerMVEntry];
+        for (int i = 0; i < length; i++) {
+          int numValues = getFloatMV(docIds[i], floatValueBuffer, context);
+          values[i] = new int[numValues];
+          for (int j = 0; j < numValues; j++) {
+            values[i][j] = (int) floatValueBuffer[j];
+          }
+        }
+        break;
+      case DOUBLE:
+        double[] doubleValueBuffer = new double[maxNumValuesPerMVEntry];
+        for (int i = 0; i < length; i++) {
+          int numValues = getDoubleMV(docIds[i], doubleValueBuffer, context);
+          values[i] = new int[numValues];
+          for (int j = 0; j < numValues; j++) {
+            values[i][j] = (int) doubleValueBuffer[j];
+          }
+        }
+        break;
+      case STRING:
+        String[] stringValueBuffer = new String[maxNumValuesPerMVEntry];
+        for (int i = 0; i < length; i++) {
+          int numValues = getStringMV(docIds[i], stringValueBuffer, context);
+          values[i] = new int[numValues];
+          for (int j = 0; j < numValues; j++) {
+            values[i][j] = Integer.parseInt(stringValueBuffer[j]);
+          }
+        }
+        break;
+      default:
+        throw new IllegalArgumentException("readValuesMV not supported for type " + getValueType());
+    }
+  }
+
+  /**
+   * Fills the values
+   * @param docIds Array containing the document ids to read
+   * @param length Number of values to read
+   * @param maxNumValuesPerMVEntry Maximum number of values per MV entry
+   * @param values Values to fill
+   * @param context Reader context
+   */
+  default void readValuesMV(int[] docIds, int length, int maxNumValuesPerMVEntry, long[][] values, T context) {
+    switch (getValueType()) {
+      case INT:
+        int[] intValueBuffer = new int[maxNumValuesPerMVEntry];
+        for (int i = 0; i < length; i++) {
+          int numValues = getIntMV(docIds[i], intValueBuffer, context);
+          values[i] = new long[numValues];
+          for (int j = 0; j < numValues; j++) {
+            values[i][j] = intValueBuffer[j];
+          }
+        }
+        break;
+      case LONG:
+        for (int i = 0; i < length; i++) {
+          values[i] = getLongMV(docIds[i], context);
+        }
+        break;
+      case FLOAT:
+        float[] floatValueBuffer = new float[maxNumValuesPerMVEntry];
+        for (int i = 0; i < length; i++) {
+          int numValues = getFloatMV(docIds[i], floatValueBuffer, context);
+          values[i] = new long[numValues];
+          for (int j = 0; j < numValues; j++) {
+            values[i][j] = (long) floatValueBuffer[j];
+          }
+        }
+        break;
+      case DOUBLE:
+        double[] doubleValueBuffer = new double[maxNumValuesPerMVEntry];
+        for (int i = 0; i < length; i++) {
+          int numValues = getDoubleMV(docIds[i], doubleValueBuffer, context);
+          values[i] = new long[numValues];
+          for (int j = 0; j < numValues; j++) {
+            values[i][j] = (long) doubleValueBuffer[j];
+          }
+        }
+        break;
+      case STRING:
+        String[] stringValueBuffer = new String[maxNumValuesPerMVEntry];
+        for (int i = 0; i < length; i++) {
+          int numValues = getStringMV(docIds[i], stringValueBuffer, context);
+          values[i] = new long[numValues];
+          for (int j = 0; j < numValues; j++) {
+            values[i][j] = Long.parseLong(stringValueBuffer[j]);
+          }
+        }
+        break;
+      default:
+        throw new IllegalArgumentException("readValuesMV not supported for type " + getValueType());
+    }
+  }
+
+  /**
+   * Fills the values
+   * @param docIds Array containing the document ids to read
+   * @param length Number of values to read
+   * @param maxNumValuesPerMVEntry Maximum number of values per MV entry
+   * @param values Values to fill
+   * @param context Reader context
+   */
+  default void readValuesMV(int[] docIds, int length, int maxNumValuesPerMVEntry, float[][] values, T context) {
+    switch (getValueType()) {
+      case INT:
+        int[] intValueBuffer = new int[maxNumValuesPerMVEntry];
+        for (int i = 0; i < length; i++) {
+          int numValues = getIntMV(docIds[i], intValueBuffer, context);
+          values[i] = new float[numValues];
+          for (int j = 0; j < numValues; j++) {
+            values[i][j] = intValueBuffer[j];
+          }
+        }
+        break;
+      case LONG:
+        long[] longValueBuffer = new long[maxNumValuesPerMVEntry];
+        for (int i = 0; i < length; i++) {
+          int numValues = getLongMV(docIds[i], longValueBuffer, context);
+          values[i] = new float[numValues];
+          for (int j = 0; j < numValues; j++) {
+            values[i][j] = longValueBuffer[j];
+          }
+        }
+        break;
+      case FLOAT:
+        for (int i = 0; i < length; i++) {
+          values[i] = getFloatMV(docIds[i], context);
+        }
+        break;
+      case DOUBLE:
+        double[] doubleValueBuffer = new double[maxNumValuesPerMVEntry];
+        for (int i = 0; i < length; i++) {
+          int numValues = getDoubleMV(docIds[i], doubleValueBuffer, context);
+          values[i] = new float[numValues];
+          for (int j = 0; j < numValues; j++) {
+            values[i][j] = (float) doubleValueBuffer[j];
+          }
+        }
+        break;
+      case STRING:
+        String[] stringValueBuffer = new String[maxNumValuesPerMVEntry];
+        for (int i = 0; i < length; i++) {
+          int numValues = getStringMV(docIds[i], stringValueBuffer, context);
+          values[i] = new float[numValues];
+          for (int j = 0; j < numValues; j++) {
+            values[i][j] = Float.parseFloat(stringValueBuffer[j]);
+          }
+        }
+        break;
+      default:
+        throw new IllegalArgumentException("readValuesMV not supported for type " + getValueType());
+    }
+  }
+
+  /**
+   * Fills the values
+   * @param docIds Array containing the document ids to read
+   * @param length Number of values to read
+   * @param maxNumValuesPerMVEntry Maximum number of values per MV entry
+   * @param values Values to fill
+   * @param context Reader context
+   */
+  default void readValuesMV(int[] docIds, int length, int maxNumValuesPerMVEntry, double[][] values, T context) {
+    switch (getValueType()) {
+      case INT:
+        int[] intValueBuffer = new int[maxNumValuesPerMVEntry];
+        for (int i = 0; i < length; i++) {
+          int numValues = getIntMV(docIds[i], intValueBuffer, context);
+          values[i] = new double[numValues];
+          for (int j = 0; j < numValues; j++) {
+            values[i][j] = intValueBuffer[j];
+          }
+        }
+        break;
+      case LONG:
+        long[] longValueBuffer = new long[maxNumValuesPerMVEntry];
+        for (int i = 0; i < length; i++) {
+          int numValues = getLongMV(docIds[i], longValueBuffer, context);
+          values[i] = new double[numValues];
+          for (int j = 0; j < numValues; j++) {
+            values[i][j] = longValueBuffer[j];
+          }
+        }
+        break;
+      case FLOAT:
+        float[] floatValueBuffer = new float[maxNumValuesPerMVEntry];
+        for (int i = 0; i < length; i++) {
+          int numValues = getFloatMV(docIds[i], floatValueBuffer, context);
+          values[i] = new double[numValues];
+          for (int j = 0; j < numValues; j++) {
+            values[i][j] = floatValueBuffer[j];
+          }
+        }
+        break;
+      case DOUBLE:
+        for (int i = 0; i < length; i++) {
+          values[i] = getDoubleMV(docIds[i], context);
+        }
+        break;
+      case STRING:
+        String[] stringValueBuffer = new String[maxNumValuesPerMVEntry];
+        for (int i = 0; i < length; i++) {
+          int numValues = getStringMV(docIds[i], stringValueBuffer, context);
+          values[i] = new double[numValues];
+          for (int j = 0; j < numValues; j++) {
+            values[i][j] = Double.parseDouble(stringValueBuffer[j]);
+          }
+        }
+        break;
+      default:
+        throw new IllegalArgumentException("readValuesMV not supported for type " + getValueType());
+    }
+  }
+
+  /**
+   * Fills the values
+   * @param docIds Array containing the document ids to read
+   * @param length Number of values to read
+   * @param maxNumValuesPerMVEntry Maximum number of values per MV entry
+   * @param values Values to fill
+   * @param context Reader context
+   */
+  default void readValuesMV(int[] docIds, int length, int maxNumValuesPerMVEntry, String[][] values, T context) {
+    switch (getValueType()) {
+      case INT:
+        int[] intValueBuffer = new int[maxNumValuesPerMVEntry];
+        for (int i = 0; i < length; i++) {
+          int numValues = getIntMV(docIds[i], intValueBuffer, context);
+          values[i] = new String[numValues];
+          for (int j = 0; j < numValues; j++) {
+            values[i][j] = String.valueOf(intValueBuffer[j]);
+          }
+        }
+        break;
+      case LONG:
+        long[] longValueBuffer = new long[maxNumValuesPerMVEntry];
+        for (int i = 0; i < length; i++) {
+          int numValues = getLongMV(docIds[i], longValueBuffer, context);
+          values[i] = new String[numValues];
+          for (int j = 0; j < numValues; j++) {
+            values[i][j] = String.valueOf(longValueBuffer[j]);
+          }
+        }
+        break;
+      case FLOAT:
+        float[] floatValueBuffer = new float[maxNumValuesPerMVEntry];
+        for (int i = 0; i < length; i++) {
+          int numValues = getFloatMV(docIds[i], floatValueBuffer, context);
+          values[i] = new String[numValues];
+          for (int j = 0; j < numValues; j++) {
+            values[i][j] = String.valueOf(floatValueBuffer[j]);
+          }
+        }
+        break;
+      case DOUBLE:
+        double[] doubleValueBuffer = new double[maxNumValuesPerMVEntry];
+        for (int i = 0; i < length; i++) {
+          int numValues = getDoubleMV(docIds[i], doubleValueBuffer, context);
+          values[i] = new String[numValues];
+          for (int j = 0; j < numValues; j++) {
+            values[i][j] = String.valueOf(doubleValueBuffer[j]);
+          }
+        }
+        break;
+      case STRING:
+        for (int i = 0; i < length; i++) {
+          values[i] = getStringMV(docIds[i], context);
+        }
+        break;
+      default:
+        throw new IllegalArgumentException("readValuesMV not supported for type " + getValueType());
+    }
+  }
+
   /**
    * Reads the INT type multi-value at the given document id into the passed in value buffer (the buffer size must be
    * enough to hold all the values for the multi-value entry) and returns the number of values within the multi-value
@@ -427,6 +737,17 @@ public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends
     throw new UnsupportedOperationException();
   }
 
+  /**
+   * Reads the INT type multi-value at the given document id.
+   *
+   * @param docId Document id
+   * @param context Reader context
+   * @return INT values at the given document id
+   */
+  default int[] getIntMV(int docId, T context) {
+    throw new UnsupportedOperationException();
+  }
+
   /**
    * Reads the LONG type multi-value at the given document id into the passed in value buffer (the buffer size must be
    * enough to hold all the values for the multi-value entry) and returns the number of values within the multi-value
@@ -441,6 +762,17 @@ public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends
     throw new UnsupportedOperationException();
   }
 
+  /**
+   * Reads the LONG type multi-value at the given document id.
+   *
+   * @param docId Document id
+   * @param context Reader context
+   * @return LONG values at the given document id
+   */
+  default long[] getLongMV(int docId, T context) {
+    throw new UnsupportedOperationException();
+  }
+
   /**
    * Reads the FLOAT type multi-value at the given document id into the passed in value buffer (the buffer size must be
    * enough to hold all the values for the multi-value entry) and returns the number of values within the multi-value
@@ -455,6 +787,17 @@ public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends
     throw new UnsupportedOperationException();
   }
 
+  /**
+   * Reads the FLOAT type multi-value at the given document id.
+   *
+   * @param docId Document id
+   * @param context Reader context
+   * @return FLOAT values at the given document id
+   */
+  default float[] getFloatMV(int docId, T context) {
+    throw new UnsupportedOperationException();
+  }
+
   /**
    * Reads the DOUBLE type multi-value at the given document id into the passed in value buffer (the buffer size must
    * be enough to hold all the values for the multi-value entry) and returns the number of values within the multi-value
@@ -469,6 +812,17 @@ public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends
     throw new UnsupportedOperationException();
   }
 
+  /**
+   * Reads the DOUBLE type multi-value at the given document id.
+   *
+   * @param docId Document id
+   * @param context Reader context
+   * @return DOUBLE values at the given document id
+   */
+  default double[] getDoubleMV(int docId, T context) {
+    throw new UnsupportedOperationException();
+  }
+
   /**
    * Reads the STRING type multi-value at the given document id into the passed in value buffer (the buffer size must
    * be enough to hold all the values for the multi-value entry) and returns the number of values within the multi-value
@@ -483,6 +837,17 @@ public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends
     throw new UnsupportedOperationException();
   }
 
+  /**
+   * Reads the STRING type multi-value at the given document id.
+   *
+   * @param docId Document id
+   * @param context Reader context
+   * @return STRING values at the given document id
+   */
+  default String[] getStringMV(int docId, T context) {
+    throw new UnsupportedOperationException();
+  }
+
   /**
    * Reads the bytes type multi-value at the given document id into the passed in value buffer (the buffer size must
    * be enough to hold all the values for the multi-value entry) and returns the number of values within the multi-value
@@ -496,4 +861,26 @@ public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends
   default int getBytesMV(int docId, byte[][] valueBuffer, T context) {
     throw new UnsupportedOperationException();
   }
+
+  /**
+   * Reads the bytes type multi-value at the given document id.
+   *
+   * @param docId Document id
+   * @param context Reader context
+   * @return BYTE values at the given document id
+   */
+  default byte[][] getBytesMV(int docId, T context) {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Gets the number of multi-values at a given document id and returns it.
+   *
+   * @param docId Document id
+   * @param context Reader context
+   * @return Number of values within the multi-value entry
+   */
+  default int getNumValuesMV(int docId, T context) {
+    throw new UnsupportedOperationException();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org