You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/11/28 23:40:47 UTC

(pinot) branch master updated: Add a new MV forward index to only store unique MV values (#11993)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d654cc9db2 Add a new MV forward index to only store unique MV values (#11993)
d654cc9db2 is described below

commit d654cc9db2c8561d8b7856c316c6a00c73cc7ff5
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Nov 28 15:40:42 2023 -0800

    Add a new MV forward index to only store unique MV values (#11993)
---
 .../pinot/core/startree/v2/BaseStarTreeV2Test.java |   7 +-
 .../tests/OfflineClusterIntegrationTest.java       |  48 +++-
 .../local/io/util/FixedBitIntReaderWriter.java     |   8 +
 .../segment/local/io/util/PinotDataBitSet.java     |   6 +
 .../FixedBitMVEntryDictForwardIndexWriter.java     | 126 +++++++++
 .../MultiValueEntryDictForwardIndexCreator.java    |  67 +++++
 .../index/forward/ForwardIndexCreatorFactory.java  |  10 +-
 .../index/forward/ForwardIndexReaderFactory.java   |  11 +-
 .../segment/index/forward/ForwardIndexType.java    |  78 ++----
 .../segment/index/loader/ForwardIndexHandler.java  | 284 ++++++++++-----------
 .../segment/index/loader/IndexLoadingConfig.java   |  15 +-
 .../defaultcolumn/BaseDefaultColumnHandler.java    |  19 +-
 .../FixedBitMVEntryDictForwardIndexReader.java     | 155 +++++++++++
 .../segment/local/utils/TableConfigUtils.java      | 106 ++++----
 .../FixedBitMVEntryDictForwardIndexTest.java       | 123 +++++++++
 .../index/forward/ForwardIndexTypeTest.java        |  56 ++--
 .../index/loader/ForwardIndexHandlerTest.java      | 151 ++++++++---
 .../index/loader/SegmentPreProcessorTest.java      |  27 +-
 .../segment/local/utils/TableConfigUtilsTest.java  |  60 +++--
 .../spi/compression/DictIdCompressionType.java     |  47 ++++
 .../segment/spi/index/ForwardIndexConfig.java      |  81 ++++--
 .../spi/index/reader/ForwardIndexReader.java       |  11 +-
 .../apache/pinot/spi/config/table/FieldConfig.java |  24 +-
 23 files changed, 1121 insertions(+), 399 deletions(-)

diff --git a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
index 3787363d17..63cc2c2bda 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java
@@ -462,7 +462,12 @@ abstract class BaseStarTreeV2Test<R, A> {
    */
   CompressionCodec getCompressionCodec() {
     CompressionCodec[] compressionCodecs = CompressionCodec.values();
-    return compressionCodecs[RANDOM.nextInt(compressionCodecs.length)];
+    while (true) {
+      CompressionCodec compressionCodec = compressionCodecs[RANDOM.nextInt(compressionCodecs.length)];
+      if (compressionCodec.isApplicableToRawIndex()) {
+        return compressionCodec;
+      }
+    }
   }
 
   abstract ValueAggregator<R, A> getValueAggregator();
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 824991d86f..86bafbd2f0 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -64,6 +64,8 @@ import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
 import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
 import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec;
 import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.QueryConfig;
 import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
@@ -154,7 +156,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
   private static final String COLUMN_LENGTH_MAP_KEY = "columnLengthMap";
   private static final String COLUMN_CARDINALITY_MAP_KEY = "columnCardinalityMap";
   private static final String MAX_NUM_MULTI_VALUES_MAP_KEY = "maxNumMultiValuesMap";
-  private static final int DISK_SIZE_IN_BYTES = 20798784;
+  private static final int DISK_SIZE_IN_BYTES = 20277762;
   private static final int NUM_ROWS = 115545;
 
   private final List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbacks =
@@ -178,6 +180,13 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     return _schemaFileName;
   }
 
+  @Override
+  protected List<FieldConfig> getFieldConfigs() {
+    return Collections.singletonList(
+        new FieldConfig("DivAirports", FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(),
+            CompressionCodec.MV_ENTRY_DICT, null));
+  }
+
   @BeforeClass
   public void setUp()
       throws Exception {
@@ -645,7 +654,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
   }
 
   @Test
-  public void testMaxQueryResponseSizeTableConfig() throws Exception {
+  public void testMaxQueryResponseSizeTableConfig()
+      throws Exception {
     TableConfig tableConfig = getOfflineTableConfig();
     tableConfig.setQueryConfig(new QueryConfig(null, false, null, null, 1000L, null));
     updateTableConfig(tableConfig);
@@ -677,7 +687,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
   }
 
   @Test
-  public void testMaxServerResponseSizeTableConfig() throws Exception {
+  public void testMaxServerResponseSizeTableConfig()
+      throws Exception {
     TableConfig tableConfig = getOfflineTableConfig();
     tableConfig.setQueryConfig(new QueryConfig(null, false, null, null, null, 1000L));
     updateTableConfig(tableConfig);
@@ -709,7 +720,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
   }
 
   @Test
-  public void testMaxResponseSizeTableConfigOrdering() throws Exception {
+  public void testMaxResponseSizeTableConfigOrdering()
+      throws Exception {
     TableConfig tableConfig = getOfflineTableConfig();
     tableConfig.setQueryConfig(new QueryConfig(null, false, null, null, 1000000L, 1000L));
     updateTableConfig(tableConfig);
@@ -1569,12 +1581,39 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     IngestionConfig ingestionConfig = new IngestionConfig();
     ingestionConfig.setTransformConfigs(transformConfigs);
     tableConfig.setIngestionConfig(ingestionConfig);
+    List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
+    assertNotNull(fieldConfigList);
+    fieldConfigList.add(
+        new FieldConfig("NewAddedDerivedDivAirportSeqIDs", FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(),
+            CompressionCodec.MV_ENTRY_DICT, null));
+    fieldConfigList.add(new FieldConfig("NewAddedDerivedDivAirportSeqIDsString", FieldConfig.EncodingType.DICTIONARY,
+        Collections.emptyList(), CompressionCodec.MV_ENTRY_DICT, null));
     updateTableConfig(tableConfig);
 
     // Trigger reload
     reloadAllSegments(TEST_EXTRA_COLUMNS_QUERY, false, numTotalDocs);
     assertEquals(postQuery(TEST_EXTRA_COLUMNS_QUERY).get("resultTable").get("rows").get(0).get(0).asLong(),
         numTotalDocs);
+
+    // Verify the index sizes
+    JsonNode columnIndexSizeMap = JsonUtils.stringToJsonNode(sendGetRequest(
+            getControllerBaseApiUrl() + "/tables/mytable/metadata?columns=DivAirportSeqIDs"
+                + "&columns=NewAddedDerivedDivAirportSeqIDs&columns=NewAddedDerivedDivAirportSeqIDsString"))
+        .get("columnIndexSizeMap");
+    assertEquals(columnIndexSizeMap.size(), 3);
+    JsonNode originalColumnIndexSizes = columnIndexSizeMap.get("DivAirportSeqIDs");
+    JsonNode derivedColumnIndexSizes = columnIndexSizeMap.get("NewAddedDerivedDivAirportSeqIDs");
+    JsonNode derivedStringColumnIndexSizes = columnIndexSizeMap.get("NewAddedDerivedDivAirportSeqIDsString");
+
+    // Derived int column should have the same dictionary size as the original column
+    double originalColumnDictionarySize = originalColumnIndexSizes.get("dictionary").asDouble();
+    assertEquals(derivedColumnIndexSizes.get("dictionary").asDouble(), originalColumnDictionarySize);
+    // Derived string column should have larger dictionary size than the original column
+    assertTrue(derivedStringColumnIndexSizes.get("dictionary").asDouble() > originalColumnDictionarySize);
+    // Both derived columns should have smaller forward index size than the original column because of compression
+    double derivedColumnForwardIndexSize = derivedColumnIndexSizes.get("forward_index").asDouble();
+    assertTrue(derivedColumnForwardIndexSize < originalColumnIndexSizes.get("forward_index").asDouble());
+    assertEquals(derivedStringColumnIndexSizes.get("forward_index").asDouble(), derivedColumnForwardIndexSize);
   }
 
   private void reloadWithMissingColumns()
@@ -1582,6 +1621,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     // Remove columns from the table config first to pass the validation of the table config
     TableConfig tableConfig = getOfflineTableConfig();
     tableConfig.setIngestionConfig(null);
+    tableConfig.setFieldConfigList(getFieldConfigs());
     updateTableConfig(tableConfig);
 
     // Need to first delete then add the schema because removing columns is backward-incompatible change
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/FixedBitIntReaderWriter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/FixedBitIntReaderWriter.java
index 76e1a154c8..3ef165740f 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/FixedBitIntReaderWriter.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/FixedBitIntReaderWriter.java
@@ -50,6 +50,14 @@ public final class FixedBitIntReaderWriter implements Closeable {
     _dataBitSet.writeInt(startIndex, _numBitsPerValue, length, values);
   }
 
+  public int getStartByteOffset(int index) {
+    return (int) (((long) index * _numBitsPerValue) / Byte.SIZE);
+  }
+
+  public int getEndByteOffset(int index) {
+    return (int) (((long) (index + 1) * _numBitsPerValue - 1) / Byte.SIZE) + 1;
+  }
+
   @Override
   public void close() {
     _dataBitSet.close();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/PinotDataBitSet.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/PinotDataBitSet.java
index 4ed84a2c0c..a42665e941 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/PinotDataBitSet.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/util/PinotDataBitSet.java
@@ -102,6 +102,9 @@ public final class PinotDataBitSet implements Closeable {
   }
 
   public void readInt(int startIndex, int numBitsPerValue, int length, int[] buffer) {
+    if (length == 0) {
+      return;
+    }
     long startBitOffset = (long) startIndex * numBitsPerValue;
     int byteOffset = (int) (startBitOffset / Byte.SIZE);
     int bitOffsetInFirstByte = (int) (startBitOffset % Byte.SIZE);
@@ -167,6 +170,9 @@ public final class PinotDataBitSet implements Closeable {
   }
 
   public void writeInt(int startIndex, int numBitsPerValue, int length, int[] values) {
+    if (length == 0) {
+      return;
+    }
     long startBitOffset = (long) startIndex * numBitsPerValue;
     int byteOffset = (int) (startBitOffset / Byte.SIZE);
     int bitOffsetInFirstByte = (int) (startBitOffset % Byte.SIZE);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedBitMVEntryDictForwardIndexWriter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedBitMVEntryDictForwardIndexWriter.java
new file mode 100644
index 0000000000..80a9a3dfa3
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/FixedBitMVEntryDictForwardIndexWriter.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.io.writer.impl;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteOrder;
+import org.apache.pinot.segment.local.io.util.FixedBitIntReaderWriter;
+import org.apache.pinot.segment.local.io.util.PinotDataBitSet;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+
+
+/**
+ * Bit-compressed dictionary-encoded forward index writer for multi-value columns, where a second level dictionary
+ * encoding for multi-value entries (instead of individual values within the entry) are maintained within the forward
+ * index.
+ *
+ * Index layout:
+ * - Index header (24 bytes)
+ * - ID buffer (stores the multi-value entry id for each doc id)
+ * - Offset buffer (stores the start offset of each multi-value entry, followed by end offset of the last value)
+ * - Value buffer (stores the individual values)
+ *
+ * Header layout:
+ * - Magic marker (4 bytes)
+ * - Version (2 bytes)
+ * - Bits per value (1 byte)
+ * - Bits per id (1 byte)
+ * - Number of unique MV entries (4 bytes)
+ * - Number of total values (4 bytes)
+ * - Start offset of offset buffer (4 bytes)
+ * - Start offset of value buffer (4 bytes)
+ */
+public class FixedBitMVEntryDictForwardIndexWriter implements Closeable {
+  public static final int MAGIC_MARKER = 0xffabcdef;
+  public static final short VERSION = 1;
+  public static final int HEADER_SIZE = 24;
+
+  private final Object2IntOpenHashMap<IntArrayList> _entryToIdMap = new Object2IntOpenHashMap<>();
+  private final File _file;
+  private final int _numBitsPerValue;
+  private final IntArrayList _ids;
+
+  public FixedBitMVEntryDictForwardIndexWriter(File file, int numDocs, int numBitsPerValue) {
+    _file = file;
+    _numBitsPerValue = numBitsPerValue;
+    _ids = new IntArrayList(numDocs);
+  }
+
+  public void putDictIds(int[] dictIds) {
+    // Lookup the map, and create a new id when the entry is not found.
+    _ids.add(_entryToIdMap.computeIntIfAbsent(IntArrayList.wrap(dictIds), k -> _entryToIdMap.size()));
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    int numUniqueEntries = _entryToIdMap.size();
+    int[][] idToDictIdsMap = new int[numUniqueEntries][];
+    int numTotalValues = 0;
+    for (Object2IntMap.Entry<IntArrayList> entry : _entryToIdMap.object2IntEntrySet()) {
+      int id = entry.getIntValue();
+      int[] dictIds = entry.getKey().elements();
+      idToDictIdsMap[id] = dictIds;
+      numTotalValues += dictIds.length;
+    }
+    int[] ids = _ids.elements();
+    int numBitsPerId = PinotDataBitSet.getNumBitsPerValue(numUniqueEntries - 1);
+    int idBufferSize = (int) (((long) ids.length * numBitsPerId + 7) / 8);
+    int numBitsPerOffset = PinotDataBitSet.getNumBitsPerValue(numTotalValues);
+    int offsetBufferSize = (int) (((long) (numUniqueEntries + 1) * numBitsPerOffset + 7) / 8);
+    int valueBufferSize = (int) (((long) numTotalValues * _numBitsPerValue + 7) / 8);
+    int offsetBufferOffset = HEADER_SIZE + idBufferSize;
+    int valueBufferOffset = offsetBufferOffset + offsetBufferSize;
+    int indexSize = valueBufferOffset + valueBufferSize;
+    try (PinotDataBuffer indexBuffer = PinotDataBuffer.mapFile(_file, false, 0, indexSize, ByteOrder.BIG_ENDIAN,
+        getClass().getSimpleName())) {
+      indexBuffer.putInt(0, MAGIC_MARKER);
+      indexBuffer.putShort(4, VERSION);
+      indexBuffer.putByte(6, (byte) _numBitsPerValue);
+      indexBuffer.putByte(7, (byte) numBitsPerId);
+      indexBuffer.putInt(8, numUniqueEntries);
+      indexBuffer.putInt(12, numTotalValues);
+      indexBuffer.putInt(16, offsetBufferOffset);
+      indexBuffer.putInt(20, valueBufferOffset);
+
+      try (FixedBitIntReaderWriter idWriter = new FixedBitIntReaderWriter(
+          indexBuffer.view(HEADER_SIZE, offsetBufferOffset), ids.length, numBitsPerId)) {
+        idWriter.writeInt(0, ids.length, ids);
+      }
+      try (FixedBitIntReaderWriter offsetWriter = new FixedBitIntReaderWriter(
+          indexBuffer.view(offsetBufferOffset, valueBufferOffset), numUniqueEntries + 1, numBitsPerOffset);
+          FixedBitIntReaderWriter valueWriter = new FixedBitIntReaderWriter(
+              indexBuffer.view(valueBufferOffset, indexSize), numTotalValues, _numBitsPerValue)) {
+        int startOffset = 0;
+        for (int i = 0; i < numUniqueEntries; i++) {
+          offsetWriter.writeInt(i, startOffset);
+          int[] dictIds = idToDictIdsMap[i];
+          valueWriter.writeInt(startOffset, dictIds.length, dictIds);
+          startOffset += dictIds.length;
+        }
+        offsetWriter.writeInt(numUniqueEntries, startOffset);
+      }
+    }
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueEntryDictForwardIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueEntryDictForwardIndexCreator.java
new file mode 100644
index 0000000000..2db269f038
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueEntryDictForwardIndexCreator.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.fwd;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.pinot.segment.local.io.util.PinotDataBitSet;
+import org.apache.pinot.segment.local.io.writer.impl.FixedBitMVEntryDictForwardIndexWriter;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * Forward index creator for dictionary-encoded multi-value column, where multi-value entries are dictionary encoded.
+ */
+public class MultiValueEntryDictForwardIndexCreator implements ForwardIndexCreator {
+  private final FixedBitMVEntryDictForwardIndexWriter _writer;
+
+  public MultiValueEntryDictForwardIndexCreator(File outputDir, String column, int cardinality, int numDocs) {
+    File indexFile = new File(outputDir, column + V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION);
+    int numBitsPerValue = PinotDataBitSet.getNumBitsPerValue(cardinality - 1);
+    _writer = new FixedBitMVEntryDictForwardIndexWriter(indexFile, numDocs, numBitsPerValue);
+  }
+
+  @Override
+  public boolean isDictionaryEncoded() {
+    return true;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return false;
+  }
+
+  @Override
+  public DataType getValueType() {
+    return DataType.INT;
+  }
+
+  @Override
+  public void putDictIdMV(int[] dictIds) {
+    _writer.putDictIds(dictIds);
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    _writer.close();
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java
index a57158b44f..ded5000b0c 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexCreatorFactory.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.segment.index.forward;
 
 import java.io.File;
 import java.io.IOException;
+import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueEntryDictForwardIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
@@ -29,6 +30,7 @@ import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSorted
 import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.DictIdCompressionType;
 import org.apache.pinot.segment.spi.creator.IndexCreationContext;
 import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
 import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
@@ -57,8 +59,12 @@ public class ForwardIndexCreatorFactory {
           return new SingleValueUnsortedForwardIndexCreator(indexDir, columnName, cardinality, numTotalDocs);
         }
       } else {
-        return new MultiValueUnsortedForwardIndexCreator(indexDir, columnName, cardinality, numTotalDocs,
-            context.getTotalNumberOfEntries());
+        if (indexConfig.getDictIdCompressionType() == DictIdCompressionType.MV_ENTRY_DICT) {
+          return new MultiValueEntryDictForwardIndexCreator(indexDir, columnName, cardinality, numTotalDocs);
+        } else {
+          return new MultiValueUnsortedForwardIndexCreator(indexDir, columnName, cardinality, numTotalDocs,
+              context.getTotalNumberOfEntries());
+        }
       }
     } else {
       // Dictionary disabled columns
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
index 8408a9026d..b5ca2b83c1 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java
@@ -20,6 +20,7 @@
 package org.apache.pinot.segment.local.segment.index.forward;
 
 import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
+import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVEntryDictForwardIndexReader;
 import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVForwardIndexReader;
 import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2;
 import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader;
@@ -72,8 +73,14 @@ public class ForwardIndexReaderFactory extends IndexReaderFactory.Default<Forwar
           return new FixedBitSVForwardIndexReaderV2(dataBuffer, metadata.getTotalDocs(), metadata.getBitsPerElement());
         }
       } else {
-        return new FixedBitMVForwardIndexReader(dataBuffer, metadata.getTotalDocs(), metadata.getTotalNumberOfEntries(),
-            metadata.getBitsPerElement());
+        if (dataBuffer.size() > Integer.BYTES
+            && dataBuffer.getInt(0) == FixedBitMVEntryDictForwardIndexReader.MAGIC_MARKER) {
+          return new FixedBitMVEntryDictForwardIndexReader(dataBuffer, metadata.getTotalDocs(),
+              metadata.getBitsPerElement());
+        } else {
+          return new FixedBitMVForwardIndexReader(dataBuffer, metadata.getTotalDocs(),
+              metadata.getTotalNumberOfEntries(), metadata.getBitsPerElement());
+        }
       }
     } else {
       return createRawIndexReader(dataBuffer, metadata.getDataType().getStoredType(), metadata.isSingleValue());
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
index 369341be6c..a454bf9cfb 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexType.java
@@ -21,7 +21,6 @@ package org.apache.pinot.segment.local.segment.index.forward;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
@@ -29,7 +28,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.apache.pinot.segment.local.realtime.impl.forward.FixedByteMVMutableForwardIndex;
 import org.apache.pinot.segment.local.realtime.impl.forward.FixedByteSVMutableForwardIndex;
@@ -59,6 +57,7 @@ import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
@@ -91,54 +90,29 @@ public class ForwardIndexType extends AbstractIndexType<ForwardIndexConfig, Forw
 
   @Override
   public Map<String, ForwardIndexConfig> fromIndexLoadingConfig(IndexLoadingConfig indexLoadingConfig) {
-    Set<String> disabledCols = indexLoadingConfig.getForwardIndexDisabledColumns();
+    Set<String> disabledColumns = indexLoadingConfig.getForwardIndexDisabledColumns();
+    Map<String, CompressionCodec> compressionCodecMap = indexLoadingConfig.getCompressionConfigs();
     Map<String, ForwardIndexConfig> result = new HashMap<>();
-    Set<String> allColumns = Sets.union(disabledCols, indexLoadingConfig.getAllKnownColumns());
-    for (String column : allColumns) {
-      ChunkCompressionType compressionType =
-          indexLoadingConfig.getCompressionConfigs() != null
-              ? indexLoadingConfig.getCompressionConfigs().get(column)
-              : null;
-      Supplier<ForwardIndexConfig> defaultConfig = () -> {
-        if (compressionType == null) {
-          return ForwardIndexConfig.DEFAULT;
-        } else {
-          return new ForwardIndexConfig.Builder().withCompressionType(compressionType).build();
-        }
-      };
-      if (!disabledCols.contains(column)) {
-        TableConfig tableConfig = indexLoadingConfig.getTableConfig();
-        if (tableConfig == null) {
-          result.put(column, defaultConfig.get());
-        } else {
-          List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
-          if (fieldConfigList == null) {
-            result.put(column, defaultConfig.get());
-            continue;
-          }
-          FieldConfig fieldConfig = fieldConfigList.stream()
-              .filter(fc -> fc.getName().equals(column))
-              .findAny()
-              .orElse(null);
-          if (fieldConfig == null) {
-            result.put(column, defaultConfig.get());
-            continue;
-          }
-          ForwardIndexConfig.Builder builder = new ForwardIndexConfig.Builder();
-          if (compressionType != null) {
-            builder.withCompressionType(compressionType);
-          } else {
-            FieldConfig.CompressionCodec compressionCodec = fieldConfig.getCompressionCodec();
-            if (compressionCodec != null) {
-              builder.withCompressionType(ChunkCompressionType.valueOf(compressionCodec.name()));
+    for (String column : indexLoadingConfig.getAllKnownColumns()) {
+      ForwardIndexConfig forwardIndexConfig;
+      if (!disabledColumns.contains(column)) {
+        CompressionCodec compressionCodec = compressionCodecMap.get(column);
+        if (compressionCodec == null) {
+          TableConfig tableConfig = indexLoadingConfig.getTableConfig();
+          if (tableConfig != null && tableConfig.getFieldConfigList() != null) {
+            FieldConfig fieldConfig =
+                tableConfig.getFieldConfigList().stream().filter(fc -> fc.getName().equals(column)).findAny()
+                    .orElse(null);
+            if (fieldConfig != null) {
+              compressionCodec = fieldConfig.getCompressionCodec();
             }
           }
-
-          result.put(column, builder.build());
         }
+        forwardIndexConfig = new ForwardIndexConfig.Builder().withCompressionCodec(compressionCodec).build();
       } else {
-        result.put(column, ForwardIndexConfig.DISABLED);
+        forwardIndexConfig = ForwardIndexConfig.DISABLED;
       }
+      result.put(column, forwardIndexConfig);
     }
     return result;
   }
@@ -169,13 +143,10 @@ public class ForwardIndexType extends AbstractIndexType<ForwardIndexConfig, Forw
           if (properties != null && isDisabled(properties)) {
             fwdConfig.put(fieldConfig.getName(), ForwardIndexConfig.DISABLED);
           } else {
-            DictionaryIndexConfig dictConfig = dictConfigs.get(fieldConfig.getName());
-            if (dictConfig != null && dictConfig.isDisabled()) {
-              fwdConfig.put(fieldConfig.getName(), createConfigFromFieldConfig(fieldConfig));
+            ForwardIndexConfig config = createConfigFromFieldConfig(fieldConfig);
+            if (!config.equals(ForwardIndexConfig.DEFAULT)) {
+              fwdConfig.put(fieldConfig.getName(), config);
             }
-            // On other case encoding is DICTIONARY. We create the default forward index by default. That means that if
-            // field config indicates for example a compression while using encoding dictionary, the compression is
-            // ignored.
             // It is important to do not explicitly add the default value here in order to avoid exclusive problems with
             // the default `fromIndexes` deserializer.
           }
@@ -193,17 +164,12 @@ public class ForwardIndexType extends AbstractIndexType<ForwardIndexConfig, Forw
   }
 
   private ForwardIndexConfig createConfigFromFieldConfig(FieldConfig fieldConfig) {
-    FieldConfig.CompressionCodec compressionCodec = fieldConfig.getCompressionCodec();
     ForwardIndexConfig.Builder builder = new ForwardIndexConfig.Builder();
-    if (compressionCodec != null) {
-      builder.withCompressionType(ChunkCompressionType.valueOf(compressionCodec.name()));
-    }
-
+    builder.withCompressionCodec(fieldConfig.getCompressionCodec());
     Map<String, String> properties = fieldConfig.getProperties();
     if (properties != null) {
       builder.withLegacyProperties(properties);
     }
-
     return builder.build();
   }
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
index 5b5ad75de6..ac32a0a354 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
@@ -25,7 +25,6 @@ import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -47,6 +46,7 @@ import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.DictIdCompressionType;
 import org.apache.pinot.segment.spi.creator.IndexCreationContext;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
@@ -98,11 +98,7 @@ public class ForwardIndexHandler extends BaseIndexHandler {
   private final Schema _schema;
 
   protected enum Operation {
-    DISABLE_FORWARD_INDEX,
-    ENABLE_FORWARD_INDEX,
-    DISABLE_DICTIONARY,
-    ENABLE_DICTIONARY,
-    CHANGE_RAW_INDEX_COMPRESSION_TYPE,
+    DISABLE_FORWARD_INDEX, ENABLE_FORWARD_INDEX, DISABLE_DICTIONARY, ENABLE_DICTIONARY, CHANGE_INDEX_COMPRESSION_TYPE
   }
 
   @VisibleForTesting
@@ -147,8 +143,8 @@ public class ForwardIndexHandler extends BaseIndexHandler {
             ColumnMetadata columnMetadata = createForwardIndexIfNeeded(segmentWriter, column, false);
             if (columnMetadata.hasDictionary()) {
               if (!segmentWriter.hasIndexFor(column, StandardIndexes.dictionary())) {
-                throw new IllegalStateException(String.format("Dictionary should still exist after rebuilding "
-                    + "forward index for dictionary column: %s", column));
+                throw new IllegalStateException(String.format(
+                    "Dictionary should still exist after rebuilding forward index for dictionary column: %s", column));
               }
             } else {
               if (segmentWriter.hasIndexFor(column, StandardIndexes.dictionary())) {
@@ -159,8 +155,9 @@ public class ForwardIndexHandler extends BaseIndexHandler {
             }
             break;
           case DISABLE_DICTIONARY:
-            Set<String> newForwardIndexDisabledColumns = FieldIndexConfigsUtil.columnsWithIndexDisabled(
-                _fieldIndexConfigs.keySet(), StandardIndexes.forward(), _fieldIndexConfigs);
+            Set<String> newForwardIndexDisabledColumns =
+                FieldIndexConfigsUtil.columnsWithIndexDisabled(_fieldIndexConfigs.keySet(), StandardIndexes.forward(),
+                    _fieldIndexConfigs);
             if (newForwardIndexDisabledColumns.contains(column)) {
               removeDictionaryFromForwardIndexDisabledColumn(column, segmentWriter);
               if (segmentWriter.hasIndexFor(column, StandardIndexes.dictionary())) {
@@ -177,8 +174,8 @@ public class ForwardIndexHandler extends BaseIndexHandler {
               throw new IllegalStateException(String.format("Forward index was not created for column: %s", column));
             }
             break;
-          case CHANGE_RAW_INDEX_COMPRESSION_TYPE:
-            rewriteRawForwardIndexForCompressionChange(column, segmentWriter);
+          case CHANGE_INDEX_COMPRESSION_TYPE:
+            rewriteForwardIndexForCompressionChange(column, segmentWriter);
             break;
           default:
             throw new IllegalStateException("Unsupported operation for column " + column);
@@ -197,26 +194,9 @@ public class ForwardIndexHandler extends BaseIndexHandler {
       return columnOperationsMap;
     }
 
-    // From existing column config.
     Set<String> existingAllColumns = _segmentDirectory.getSegmentMetadata().getAllColumns();
-    Set<String> existingDictColumns =
-        segmentReader.toSegmentDirectory().getColumnsWithIndex(StandardIndexes.dictionary());
-    Set<String> existingNoDictColumns = new HashSet<>();
-    for (String column : existingAllColumns) {
-      if (!existingDictColumns.contains(column)) {
-        existingNoDictColumns.add(column);
-      }
-    }
-
-    // Get list of columns with forward index and those without forward index
-    Set<String> existingForwardIndexColumns =
-        segmentReader.toSegmentDirectory().getColumnsWithIndex(StandardIndexes.forward());
-    Set<String> existingForwardIndexDisabledColumns = new HashSet<>();
-    for (String column : existingAllColumns) {
-      if (!existingForwardIndexColumns.contains(column)) {
-        existingForwardIndexDisabledColumns.add(column);
-      }
-    }
+    Set<String> existingDictColumns = _segmentDirectory.getColumnsWithIndex(StandardIndexes.dictionary());
+    Set<String> existingForwardIndexColumns = _segmentDirectory.getColumnsWithIndex(StandardIndexes.forward());
 
     for (String column : existingAllColumns) {
       if (_schema != null && !_schema.hasColumn(column)) {
@@ -224,12 +204,14 @@ public class ForwardIndexHandler extends BaseIndexHandler {
         LOGGER.info("Column {} is not in schema, skipping updating forward index", column);
         continue;
       }
+      boolean existingHasDict = existingDictColumns.contains(column);
+      boolean existingHasFwd = existingForwardIndexColumns.contains(column);
       FieldIndexConfigs newConf = _fieldIndexConfigs.get(column);
       boolean newIsFwd = newConf.getConfig(StandardIndexes.forward()).isEnabled();
       boolean newIsDict = newConf.getConfig(StandardIndexes.dictionary()).isEnabled();
       boolean newIsRange = newConf.getConfig(StandardIndexes.range()).isEnabled();
 
-      if (existingForwardIndexColumns.contains(column) && !newIsFwd) {
+      if (existingHasFwd && !newIsFwd) {
         // Existing column has a forward index. New column config disables the forward index
 
         ColumnMetadata columnMetadata = _segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
@@ -240,13 +222,13 @@ public class ForwardIndexHandler extends BaseIndexHandler {
           continue;
         }
 
-        if (existingDictColumns.contains(column)) {
+        if (existingHasDict) {
           if (!newIsDict) {
             // Dictionary was also disabled. Just disable the dictionary and remove it along with the forward index
             // If range index exists, don't try to regenerate it on toggling the dictionary, throw an error instead
-            Preconditions.checkState(!newIsRange,
-                String.format("Must disable range (enabled) index to disable the dictionary and forward index for "
-                        + "column: %s or refresh / back-fill the forward index", column));
+            Preconditions.checkState(!newIsRange, String.format(
+                "Must disable range (enabled) index to disable the dictionary and forward index for column: %s or "
+                    + "refresh / back-fill the forward index", column));
             columnOperationsMap.put(column,
                 Arrays.asList(Operation.DISABLE_FORWARD_INDEX, Operation.DISABLE_DICTIONARY));
           } else {
@@ -263,7 +245,7 @@ public class ForwardIndexHandler extends BaseIndexHandler {
                 Arrays.asList(Operation.DISABLE_FORWARD_INDEX, Operation.ENABLE_DICTIONARY));
           }
         }
-      } else if (existingForwardIndexDisabledColumns.contains(column) && newIsFwd) {
+      } else if (!existingHasFwd && newIsFwd) {
         // Existing column does not have a forward index. New column config enables the forward index
 
         ColumnMetadata columnMetadata = _segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
@@ -277,37 +259,37 @@ public class ForwardIndexHandler extends BaseIndexHandler {
         // Get list of columns with inverted index
         Set<String> existingInvertedIndexColumns =
             segmentReader.toSegmentDirectory().getColumnsWithIndex(StandardIndexes.inverted());
-        if (!existingDictColumns.contains(column) || !existingInvertedIndexColumns.contains(column)) {
+        if (!existingHasDict || !existingInvertedIndexColumns.contains(column)) {
           // If either dictionary or inverted index is missing on the column there is no way to re-generate the forward
           // index. Treat this as a no-op and log a warning.
           LOGGER.warn("Trying to enable the forward index for a column {} missing either the dictionary ({}) and / or "
                   + "the inverted index ({}) is not possible. Either a refresh or back-fill is required to get the "
-                  + "forward index, ignoring", column, existingDictColumns.contains(column) ? "enabled" : "disabled",
+                  + "forward index, ignoring", column, existingHasDict ? "enabled" : "disabled",
               existingInvertedIndexColumns.contains(column) ? "enabled" : "disabled");
           continue;
         }
 
         columnOperationsMap.put(column, Collections.singletonList(Operation.ENABLE_FORWARD_INDEX));
-      } else if (existingForwardIndexDisabledColumns.contains(column) && !newIsFwd) {
+      } else if (!existingHasFwd) {
         // Forward index is disabled for the existing column and should remain disabled based on the latest config
         // Need some checks to see whether the dictionary is being enabled or disabled here and take appropriate actions
 
         // If the dictionary is not enabled on the existing column it must be on the new noDictionary column list.
         // Cannot enable the dictionary for a column with forward index disabled.
-        Preconditions.checkState(existingDictColumns.contains(column) || !newIsDict,
+        Preconditions.checkState(existingHasDict || !newIsDict,
             String.format("Cannot regenerate the dictionary for column %s with forward index disabled. Please "
                 + "refresh or back-fill the data to add back the forward index", column));
 
-        if (existingDictColumns.contains(column) && !newIsDict) {
+        if (existingHasDict && !newIsDict) {
           // Dictionary is currently enabled on this column but is supposed to be disabled. Remove the dictionary
           // and update the segment metadata If the range index exists then throw an error since we are not
           // regenerating the range index on toggling the dictionary
-          Preconditions.checkState(!newIsRange,
-              String.format("Must disable range (enabled) index to disable the dictionary for a forwardIndexDisabled "
-                      + "column: %s or refresh / back-fill the forward index", column));
+          Preconditions.checkState(!newIsRange, String.format(
+              "Must disable range (enabled) index to disable the dictionary for a forwardIndexDisabled column: %s or "
+                  + "refresh / back-fill the forward index", column));
           columnOperationsMap.put(column, Collections.singletonList(Operation.DISABLE_DICTIONARY));
         }
-      } else if (existingNoDictColumns.contains(column) && newIsDict) {
+      } else if (!existingHasDict && newIsDict) {
         // Existing column is RAW. New column is dictionary enabled.
         if (_schema == null || _tableConfig == null) {
           // This can only happen in tests.
@@ -315,25 +297,28 @@ public class ForwardIndexHandler extends BaseIndexHandler {
           continue;
         }
         ColumnMetadata existingColumnMetadata = _segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
-        if (DictionaryIndexType.ignoreDictionaryOverride(
-            _tableConfig.getIndexingConfig().isOptimizeDictionary(),
+        if (DictionaryIndexType.ignoreDictionaryOverride(_tableConfig.getIndexingConfig().isOptimizeDictionary(),
             _tableConfig.getIndexingConfig().isOptimizeDictionaryForMetrics(),
-            _tableConfig.getIndexingConfig().getNoDictionarySizeRatioThreshold(),
-            existingColumnMetadata.getFieldSpec(), _fieldIndexConfigs.get(column),
-            existingColumnMetadata.getCardinality(),
+            _tableConfig.getIndexingConfig().getNoDictionarySizeRatioThreshold(), existingColumnMetadata.getFieldSpec(),
+            _fieldIndexConfigs.get(column), existingColumnMetadata.getCardinality(),
             existingColumnMetadata.getTotalNumberOfEntries())) {
           columnOperationsMap.put(column, Collections.singletonList(Operation.ENABLE_DICTIONARY));
         }
-      } else if (existingDictColumns.contains(column) && !newIsDict) {
+      } else if (existingHasDict && !newIsDict) {
         // Existing column has dictionary. New config for the column is RAW.
         if (shouldDisableDictionary(column, _segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column))) {
           columnOperationsMap.put(column, Collections.singletonList(Operation.DISABLE_DICTIONARY));
         }
-      } else if (existingNoDictColumns.contains(column) && !newIsDict) {
+      } else if (!existingHasDict) {
         // Both existing and new column is RAW forward index encoded. Check if compression needs to be changed.
         // TODO: Also check if raw index version needs to be changed
-        if (shouldChangeCompressionType(column, segmentReader)) {
-          columnOperationsMap.put(column, Collections.singletonList(Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE));
+        if (shouldChangeRawCompressionType(column, segmentReader)) {
+          columnOperationsMap.put(column, Collections.singletonList(Operation.CHANGE_INDEX_COMPRESSION_TYPE));
+        }
+      } else {
+        // Both existing and new column is dictionary encoded. Check if compression needs to be changed.
+        if (shouldChangeDictIdCompressionType(column, segmentReader)) {
+          columnOperationsMap.put(column, Collections.singletonList(Operation.CHANGE_INDEX_COMPRESSION_TYPE));
         }
       }
     }
@@ -346,8 +331,9 @@ public class ForwardIndexHandler extends BaseIndexHandler {
     // Remove the dictionary and update the metadata to indicate that the dictionary is no longer present
     segmentWriter.removeIndex(column, StandardIndexes.dictionary());
     String segmentName = _segmentDirectory.getSegmentMetadata().getName();
-    LOGGER.info("Removed dictionary for noForwardIndex column. Updating metadata properties for segment={} and "
-            + "column={}", segmentName, column);
+    LOGGER.info(
+        "Removed dictionary for noForwardIndex column. Updating metadata properties for segment={} and column={}",
+        segmentName, column);
     Map<String, String> metadataProperties = new HashMap<>();
     metadataProperties.put(getKeyFor(column, HAS_DICTIONARY), String.valueOf(false));
     metadataProperties.put(getKeyFor(column, DICTIONARY_ELEMENT_SIZE), String.valueOf(0));
@@ -381,37 +367,63 @@ public class ForwardIndexHandler extends BaseIndexHandler {
     return true;
   }
 
-  private boolean shouldChangeCompressionType(String column, SegmentDirectory.Reader segmentReader)
+  private boolean shouldChangeRawCompressionType(String column, SegmentDirectory.Reader segmentReader)
       throws Exception {
-    ColumnMetadata existingColMetadata = _segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
-
     // The compression type for an existing segment can only be determined by reading the forward index header.
+    ColumnMetadata existingColMetadata = _segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
+    ChunkCompressionType existingCompressionType;
     try (ForwardIndexReader<?> fwdIndexReader = ForwardIndexType.read(segmentReader, existingColMetadata)) {
-      ChunkCompressionType existingCompressionType = fwdIndexReader.getCompressionType();
+      existingCompressionType = fwdIndexReader.getCompressionType();
       Preconditions.checkState(existingCompressionType != null,
           "Existing compressionType cannot be null for raw forward index column=" + column);
+    }
 
-      // Get the new compression type.
-      ChunkCompressionType newCompressionType = _fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward())
-          .getChunkCompressionType();
+    // Get the new compression type.
+    ChunkCompressionType newCompressionType =
+        _fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward()).getChunkCompressionType();
 
-      // Note that default compression type (PASS_THROUGH for metric and LZ4 for dimension) is not considered if the
-      // compressionType is not explicitly provided in tableConfig. This is to avoid incorrectly rewriting all the
-      // forward indexes during segmentReload when the default compressionType changes.
-      return newCompressionType != null && existingCompressionType != newCompressionType;
+    // Note that default compression type (PASS_THROUGH for metric and LZ4 for dimension) is not considered if the
+    // compressionType is not explicitly provided in tableConfig. This is to avoid incorrectly rewriting all the
+    // forward indexes during segmentReload when the default compressionType changes.
+    return newCompressionType != null && existingCompressionType != newCompressionType;
+  }
+
+  private boolean shouldChangeDictIdCompressionType(String column, SegmentDirectory.Reader segmentReader)
+      throws Exception {
+    // The compression type for an existing segment can only be determined by reading the forward index header.
+    ColumnMetadata existingColMetadata = _segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
+    DictIdCompressionType existingCompressionType;
+    try (ForwardIndexReader<?> fwdIndexReader = ForwardIndexType.read(segmentReader, existingColMetadata)) {
+      existingCompressionType = fwdIndexReader.getDictIdCompressionType();
     }
+
+    // Get the new compression type.
+    DictIdCompressionType newCompressionType =
+        _fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward()).getDictIdCompressionType();
+    if (newCompressionType != null && !newCompressionType.isApplicable(existingColMetadata.isSingleValue())) {
+      newCompressionType = null;
+    }
+
+    return existingCompressionType != newCompressionType;
   }
 
-  private void rewriteRawForwardIndexForCompressionChange(String column, SegmentDirectory.Writer segmentWriter)
+  private void rewriteForwardIndexForCompressionChange(String column, SegmentDirectory.Writer segmentWriter)
       throws Exception {
     ColumnMetadata existingColMetadata = _segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
     boolean isSingleValue = existingColMetadata.isSingleValue();
+    boolean hasDictionary = existingColMetadata.hasDictionary();
 
     File indexDir = _segmentDirectory.getSegmentMetadata().getIndexDir();
     String segmentName = _segmentDirectory.getSegmentMetadata().getName();
     File inProgress = new File(indexDir, column + ".fwd.inprogress");
-    String fileExtension = isSingleValue ? V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION
-        : V1Constants.Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION;
+    String fileExtension;
+    if (isSingleValue) {
+      fileExtension = hasDictionary ? V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION
+          : V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION;
+    } else {
+      fileExtension = hasDictionary ? V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION
+          : V1Constants.Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION;
+    }
     File fwdIndexFile = new File(indexDir, column + fileExtension);
 
     if (!inProgress.exists()) {
@@ -425,20 +437,7 @@ public class ForwardIndexHandler extends BaseIndexHandler {
     }
 
     LOGGER.info("Creating new forward index for segment={} and column={}", segmentName, column);
-
-    // At this point, compressionConfigs is guaranteed to contain the column. If there's no entry in the map, we
-    // wouldn't have computed the CHANGE_RAW_COMPRESSION_TYPE operation for this column as compressionType changes
-    // are processed only if a valid compressionType is specified in fieldConfig.
-    ChunkCompressionType newCompressionType = _fieldIndexConfigs.get(column)
-        .getConfig(StandardIndexes.forward()).getChunkCompressionType();
-
-    if (isSingleValue) {
-      rewriteRawSVForwardIndexForCompressionChange(column, existingColMetadata, indexDir, segmentWriter,
-          newCompressionType);
-    } else {
-      rewriteRawMVForwardIndexForCompressionChange(column, existingColMetadata, indexDir, segmentWriter,
-          newCompressionType);
-    }
+    rewriteForwardIndexForCompressionChange(column, existingColMetadata, indexDir, segmentWriter);
 
     // We used the existing forward index to generate a new forward index. The existing forward index will be in V3
     // format and the new forward index will be in V1 format. Remove the existing forward index as it is not needed
@@ -454,61 +453,32 @@ public class ForwardIndexHandler extends BaseIndexHandler {
     LOGGER.info("Created forward index for segment: {}, column: {}", segmentName, column);
   }
 
-  private void rewriteRawMVForwardIndexForCompressionChange(String column, ColumnMetadata existingColMetadata,
-      File indexDir, SegmentDirectory.Writer segmentWriter, ChunkCompressionType newCompressionType)
+  private void rewriteForwardIndexForCompressionChange(String column, ColumnMetadata columnMetadata, File indexDir,
+      SegmentDirectory.Writer segmentWriter)
       throws Exception {
-    try (ForwardIndexReader<?> reader = ForwardIndexType.read(segmentWriter, existingColMetadata)) {
-      // For VarByte MV columns like String and Bytes, the storage representation of each row contains the following
-      // components:
-      // 1. bytes required to store the actual elements of the MV row (A)
-      // 2. bytes required to store the number of elements in the MV row (B)
-      // 3. bytes required to store the length of each MV element (C)
-      //
-      // lengthOfLongestEntry = A + B + C
-      // maxRowLengthInBytes = A
+    try (ForwardIndexReader<?> reader = ForwardIndexType.read(segmentWriter, columnMetadata)) {
       int lengthOfLongestEntry = reader.getLengthOfLongestEntry();
-      int maxNumberOfMVEntries = existingColMetadata.getMaxNumberOfMultiValues();
-      int maxRowLengthInBytes =
-          MultiValueVarByteRawIndexCreator.getMaxRowDataLengthInBytes(lengthOfLongestEntry, maxNumberOfMVEntries);
-
-      IndexCreationContext context = IndexCreationContext.builder()
-          .withMaxRowLengthInBytes(maxRowLengthInBytes)
-          .withIndexDir(indexDir)
-          .withColumnMetadata(existingColMetadata)
-          .withLengthOfLongestEntry(lengthOfLongestEntry)
-          .build();
-
-      ForwardIndexConfig config = _fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward());
-
-      try (ForwardIndexCreator creator = StandardIndexes.forward().createIndexCreator(context, config)) {
-        if (!reader.getStoredType().equals(creator.getValueType())) {
-          // Creator stored type should match reader stored type for raw columns. We do not support changing datatypes.
-          String failureMsg =
-              "Unsupported operation to change datatype for column=" + column + " from " + reader.getStoredType()
-                  .toString() + " to " + creator.getValueType().toString();
-          throw new UnsupportedOperationException(failureMsg);
-        }
-
-        int numDocs = existingColMetadata.getTotalDocs();
-        forwardIndexRewriteHelper(column, existingColMetadata, reader, creator, numDocs, null, null);
+      IndexCreationContext context;
+      if (columnMetadata.isSingleValue()) {
+        context = IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata)
+            .withLengthOfLongestEntry(lengthOfLongestEntry).build();
+      } else {
+        // For VarByte MV columns like String and Bytes, the storage representation of each row contains the following
+        // components:
+        // 1. bytes required to store the actual elements of the MV row (A)
+        // 2. bytes required to store the number of elements in the MV row (B)
+        // 3. bytes required to store the length of each MV element (C)
+        //
+        // lengthOfLongestEntry = A + B + C
+        // maxRowLengthInBytes = A
+        int maxNumValuesPerEntry = columnMetadata.getMaxNumberOfMultiValues();
+        int maxRowLengthInBytes =
+            MultiValueVarByteRawIndexCreator.getMaxRowDataLengthInBytes(lengthOfLongestEntry, maxNumValuesPerEntry);
+        context = IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(columnMetadata)
+            .withLengthOfLongestEntry(lengthOfLongestEntry).withMaxRowLengthInBytes(maxRowLengthInBytes).build();
       }
-    }
-  }
-
-  private void rewriteRawSVForwardIndexForCompressionChange(String column, ColumnMetadata existingColMetadata,
-      File indexDir, SegmentDirectory.Writer segmentWriter, ChunkCompressionType newCompressionType)
-      throws Exception {
-    try (ForwardIndexReader<?> reader = ForwardIndexType.read(segmentWriter, existingColMetadata)) {
-      int lengthOfLongestEntry = reader.getLengthOfLongestEntry();
-
-      IndexCreationContext context = IndexCreationContext.builder()
-          .withIndexDir(indexDir)
-          .withColumnMetadata(existingColMetadata)
-          .withLengthOfLongestEntry(lengthOfLongestEntry)
-          .build();
 
       ForwardIndexConfig config = _fieldIndexConfigs.get(column).getConfig(StandardIndexes.forward());
-
       try (ForwardIndexCreator creator = StandardIndexes.forward().createIndexCreator(context, config)) {
         if (!reader.getStoredType().equals(creator.getValueType())) {
           // Creator stored type should match reader stored type for raw columns. We do not support changing datatypes.
@@ -518,8 +488,8 @@ public class ForwardIndexHandler extends BaseIndexHandler {
           throw new UnsupportedOperationException(failureMsg);
         }
 
-        int numDocs = existingColMetadata.getTotalDocs();
-        forwardIndexRewriteHelper(column, existingColMetadata, reader, creator, numDocs, null, null);
+        int numDocs = columnMetadata.getTotalDocs();
+        forwardIndexRewriteHelper(column, columnMetadata, reader, creator, numDocs, null, null);
       }
     }
   }
@@ -528,16 +498,39 @@ public class ForwardIndexHandler extends BaseIndexHandler {
       ForwardIndexReader<?> reader, ForwardIndexCreator creator, int numDocs,
       @Nullable SegmentDictionaryCreator dictionaryCreator, @Nullable Dictionary dictionaryReader) {
     if (dictionaryReader == null && dictionaryCreator == null) {
-      // Read raw forward index and write raw forward index.
-      forwardIndexReadRawWriteRawHelper(column, existingColumnMetadata, reader, creator, numDocs);
-    } else if (dictionaryReader != null && dictionaryCreator == null) {
+      if (reader.isDictionaryEncoded()) {
+        Preconditions.checkState(creator.isDictionaryEncoded(), "Cannot change dictionary based forward index to raw "
+            + "forward index without providing dictionary reader for column: %s", column);
+        // Read dictionary based forward index and write dictionary based forward index.
+        forwardIndexReadDictWriteDictHelper(reader, creator, numDocs);
+      } else {
+        Preconditions.checkState(!creator.isDictionaryEncoded(), "Cannot change raw forward index to dictionary based "
+            + "forward index without providing dictionary creator for column: %s", column);
+        // Read raw forward index and write raw forward index.
+        forwardIndexReadRawWriteRawHelper(column, existingColumnMetadata, reader, creator, numDocs);
+      }
+    } else if (dictionaryCreator == null) {
       // Read dictionary based forward index and write raw forward index.
       forwardIndexReadDictWriteRawHelper(column, existingColumnMetadata, reader, creator, numDocs, dictionaryReader);
     } else if (dictionaryReader == null) {
       // Read raw forward index and write dictionary based forward index.
       forwardIndexReadRawWriteDictHelper(column, existingColumnMetadata, reader, creator, numDocs, dictionaryCreator);
     } else {
-      Preconditions.checkState(false, "Invalid dict-based read/write for column=" + column);
+      throw new IllegalStateException("One of dictionary reader or creator should be null for column: %s" + column);
+    }
+  }
+
+  private <C extends ForwardIndexReaderContext> void forwardIndexReadDictWriteDictHelper(ForwardIndexReader<C> reader,
+      ForwardIndexCreator creator, int numDocs) {
+    C readerContext = reader.createContext();
+    if (reader.isSingleValue()) {
+      for (int i = 0; i < numDocs; i++) {
+        creator.putDictId(reader.getDictId(i, readerContext));
+      }
+    } else {
+      for (int i = 0; i < numDocs; i++) {
+        creator.putDictIdMV(reader.getDictIdMV(i, readerContext));
+      }
     }
   }
 
@@ -856,11 +849,10 @@ public class ForwardIndexHandler extends BaseIndexHandler {
 
       DictionaryIndexConfig dictConf = _fieldIndexConfigs.get(column).getConfig(StandardIndexes.dictionary());
 
-      boolean useVarLength = dictConf.getUseVarLengthDictionary()
-          || DictionaryIndexType.shouldUseVarLengthDictionary(reader.getStoredType(), statsCollector);
-      SegmentDictionaryCreator dictionaryCreator =
-          new SegmentDictionaryCreator(existingColMetadata.getFieldSpec(),
-              _segmentDirectory.getSegmentMetadata().getIndexDir(), useVarLength);
+      boolean useVarLength = dictConf.getUseVarLengthDictionary() || DictionaryIndexType.shouldUseVarLengthDictionary(
+          reader.getStoredType(), statsCollector);
+      SegmentDictionaryCreator dictionaryCreator = new SegmentDictionaryCreator(existingColMetadata.getFieldSpec(),
+          _segmentDirectory.getSegmentMetadata().getIndexDir(), useVarLength);
 
       dictionaryCreator.build(statsCollector.getUniqueValuesSet());
       return dictionaryCreator;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
index bdd54224e0..f7f185cc65 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
@@ -35,7 +35,6 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
 import org.apache.pinot.segment.local.segment.index.column.PhysicalColumnIndexContainer;
 import org.apache.pinot.segment.local.segment.index.loader.columnminmaxvalue.ColumnMinMaxValueGeneratorMode;
-import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.ColumnConfigDeserializer;
 import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
@@ -49,6 +48,7 @@ import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.config.table.BloomFilterConfig;
 import org.apache.pinot.spi.config.table.FSTType;
 import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec;
 import org.apache.pinot.spi.config.table.IndexConfig;
 import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.JsonIndexConfig;
@@ -93,7 +93,7 @@ public class IndexLoadingConfig {
   private boolean _enableDynamicStarTreeCreation;
   private List<StarTreeIndexConfig> _starTreeIndexConfigs;
   private boolean _enableDefaultStarTree;
-  private Map<String, ChunkCompressionType> _compressionConfigs = new HashMap<>();
+  private Map<String, CompressionCodec> _compressionConfigs = new HashMap<>();
   private Map<String, FieldIndexConfigs> _indexConfigsByColName = new HashMap<>();
 
   private SegmentVersion _segmentVersion;
@@ -299,8 +299,8 @@ public class IndexLoadingConfig {
             + "indexLoadingConfig for indexType: {}", _schema == null, _tableConfig == null, indexType);
         deserializer = IndexConfigDeserializer.fromMap(table -> fromIndexLoadingConfig);
       } else if (_segmentTier == null) {
-        deserializer = IndexConfigDeserializer.fromMap(table -> fromIndexLoadingConfig)
-            .withFallbackAlternative(stdDeserializer);
+        deserializer =
+            IndexConfigDeserializer.fromMap(table -> fromIndexLoadingConfig).withFallbackAlternative(stdDeserializer);
       } else {
         // No need to fall back to fromIndexLoadingConfig which contains index configs for default tier, when looking
         // for tier specific index configs.
@@ -352,8 +352,7 @@ public class IndexLoadingConfig {
     for (FieldConfig fieldConfig : fieldConfigList) {
       String column = fieldConfig.getName();
       if (fieldConfig.getCompressionCodec() != null) {
-        ChunkCompressionType compressionType = ChunkCompressionType.valueOf(fieldConfig.getCompressionCodec().name());
-        _compressionConfigs.put(column, compressionType);
+        _compressionConfigs.put(column, fieldConfig.getCompressionCodec());
       }
     }
   }
@@ -613,7 +612,7 @@ public class IndexLoadingConfig {
    * Used by segmentPreProcessorTest to set compression configs.
    */
   @VisibleForTesting
-  public void setCompressionConfigs(Map<String, ChunkCompressionType> compressionConfigs) {
+  public void setCompressionConfigs(Map<String, CompressionCodec> compressionConfigs) {
     _compressionConfigs = new HashMap<>(compressionConfigs);
     _dirty = true;
   }
@@ -750,7 +749,7 @@ public class IndexLoadingConfig {
    *
    * @return a map containing column name as key and compressionType as value.
    */
-  public Map<String, ChunkCompressionType> getCompressionConfigs() {
+  public Map<String, CompressionCodec> getCompressionConfigs() {
     return unmodifiable(_compressionConfigs);
   }
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index acfd4827c6..09643966c2 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -36,6 +36,7 @@ import org.apache.pinot.segment.local.function.FunctionEvaluator;
 import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
 import org.apache.pinot.segment.local.segment.creator.impl.SegmentColumnarIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueEntryDictForwardIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSortedForwardIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator;
@@ -48,6 +49,7 @@ import org.apache.pinot.segment.local.segment.creator.impl.stats.IntColumnPreInd
 import org.apache.pinot.segment.local.segment.creator.impl.stats.LongColumnPreIndexStatsCollector;
 import org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector;
 import org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
+import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexPlugin;
 import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexType;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
@@ -55,8 +57,11 @@ import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.compression.DictIdCompressionType;
 import org.apache.pinot.segment.spi.creator.ColumnIndexCreationInfo;
 import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
+import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
+import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
 import org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator;
 import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
@@ -780,8 +785,18 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
             }
           }
         } else {
-          try (ForwardIndexCreator forwardIndexCreator = new MultiValueUnsortedForwardIndexCreator(_indexDir, column,
-              cardinality, numDocs, indexCreationInfo.getTotalNumberOfEntries())) {
+          DictIdCompressionType dictIdCompressionType = null;
+          FieldIndexConfigs fieldIndexConfig = _indexLoadingConfig.getFieldIndexConfig(column);
+          if (fieldIndexConfig != null) {
+            ForwardIndexConfig forwardIndexConfig = fieldIndexConfig.getConfig(new ForwardIndexPlugin().getIndexType());
+            if (forwardIndexConfig != null) {
+              dictIdCompressionType = forwardIndexConfig.getDictIdCompressionType();
+            }
+          }
+          try (ForwardIndexCreator forwardIndexCreator = dictIdCompressionType == DictIdCompressionType.MV_ENTRY_DICT
+              ? new MultiValueEntryDictForwardIndexCreator(_indexDir, column, cardinality, numDocs)
+              : new MultiValueUnsortedForwardIndexCreator(_indexDir, column, cardinality, numDocs,
+                  indexCreationInfo.getTotalNumberOfEntries())) {
             for (int i = 0; i < numDocs; i++) {
               forwardIndexCreator.putDictIdMV(dictionaryCreator.indexOfMV(outputValues[i]));
             }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitMVEntryDictForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitMVEntryDictForwardIndexReader.java
new file mode 100644
index 0000000000..05c4000903
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitMVEntryDictForwardIndexReader.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.readers.forward;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import org.apache.pinot.segment.local.io.util.FixedBitIntReaderWriter;
+import org.apache.pinot.segment.local.io.util.PinotDataBitSet;
+import org.apache.pinot.segment.local.io.writer.impl.FixedBitMVEntryDictForwardIndexWriter;
+import org.apache.pinot.segment.spi.compression.DictIdCompressionType;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * Bit-compressed dictionary-encoded forward index reader for multi-value columns, where a second level dictionary
+ * encoding for multi-value entries (instead of individual values within the entry) are maintained within the forward
+ * index.
+ * See {@link FixedBitMVEntryDictForwardIndexWriter} for index layout.
+ */
+public final class FixedBitMVEntryDictForwardIndexReader implements ForwardIndexReader<ForwardIndexReaderContext> {
+  public static final int MAGIC_MARKER = FixedBitMVEntryDictForwardIndexWriter.MAGIC_MARKER;
+  public static final short VERSION = FixedBitMVEntryDictForwardIndexWriter.VERSION;
+  private static final int HEADER_SIZE = FixedBitMVEntryDictForwardIndexWriter.HEADER_SIZE;
+
+  private final FixedBitIntReaderWriter _idReader;
+  private final int _offsetBufferOffset;
+  private final FixedBitIntReaderWriter _offsetReader;
+  private final int _valueBufferOffset;
+  private final FixedBitIntReaderWriter _valueReader;
+
+  public FixedBitMVEntryDictForwardIndexReader(PinotDataBuffer dataBuffer, int numDocs, int numBitsPerValue) {
+    int magicMarker = dataBuffer.getInt(0);
+    Preconditions.checkState(magicMarker == MAGIC_MARKER, "Invalid magic marker: %s (expected: %s)", magicMarker,
+        MAGIC_MARKER);
+    short version = dataBuffer.getShort(4);
+    Preconditions.checkState(version == VERSION, "Invalid version: %s (expected: %s)", version, VERSION);
+    int numBitsPerValueInHeader = dataBuffer.getByte(6);
+    Preconditions.checkState(numBitsPerValueInHeader == numBitsPerValue, "Invalid numBitsPerValue: %s (expected: %s)",
+        numBitsPerValueInHeader, numBitsPerValue);
+    int numBitsPerId = dataBuffer.getByte(7);
+    int numUniqueEntries = dataBuffer.getInt(8);
+    int numTotalValues = dataBuffer.getInt(12);
+    _offsetBufferOffset = dataBuffer.getInt(16);
+    _valueBufferOffset = dataBuffer.getInt(20);
+    _idReader = new FixedBitIntReaderWriter(dataBuffer.view(HEADER_SIZE, _offsetBufferOffset), numDocs, numBitsPerId);
+    _offsetReader =
+        new FixedBitIntReaderWriter(dataBuffer.view(_offsetBufferOffset, _valueBufferOffset), numUniqueEntries + 1,
+            PinotDataBitSet.getNumBitsPerValue(numTotalValues));
+    _valueReader = new FixedBitIntReaderWriter(dataBuffer.view(_valueBufferOffset, dataBuffer.size()), numTotalValues,
+        numBitsPerValue);
+  }
+
+  @Override
+  public boolean isDictionaryEncoded() {
+    return true;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return false;
+  }
+
+  @Override
+  public DataType getStoredType() {
+    return DataType.INT;
+  }
+
+  @Override
+  public DictIdCompressionType getDictIdCompressionType() {
+    return DictIdCompressionType.MV_ENTRY_DICT;
+  }
+
+  @Override
+  public int getDictIdMV(int docId, int[] dictIdBuffer, ForwardIndexReaderContext context) {
+    int id = _idReader.readInt(docId);
+    int startIndex = _offsetReader.readInt(id);
+    int numValues = _offsetReader.readInt(id + 1) - startIndex;
+    _valueReader.readInt(startIndex, numValues, dictIdBuffer);
+    return numValues;
+  }
+
+  @Override
+  public int[] getDictIdMV(int docId, ForwardIndexReaderContext context) {
+    int id = _idReader.readInt(docId);
+    int startIndex = _offsetReader.readInt(id);
+    int numValues = _offsetReader.readInt(id + 1) - startIndex;
+    int[] dictIdBuffer = new int[numValues];
+    _valueReader.readInt(startIndex, numValues, dictIdBuffer);
+    return dictIdBuffer;
+  }
+
+  @Override
+  public int getNumValuesMV(int docId, ForwardIndexReaderContext context) {
+    int id = _idReader.readInt(docId);
+    return _offsetReader.readInt(id + 1) - _offsetReader.readInt(id);
+  }
+
+  @Override
+  public boolean isBufferByteRangeInfoSupported() {
+    return true;
+  }
+
+  @Override
+  public void recordDocIdByteRanges(int docId, ForwardIndexReaderContext context, List<ByteRange> ranges) {
+    int id = _idReader.readInt(docId);
+    int idReaderStartByteOffset = _idReader.getStartByteOffset(docId);
+    int idReaderEndByteOffset = _idReader.getEndByteOffset(docId);
+    ranges.add(new ByteRange(HEADER_SIZE + idReaderStartByteOffset, idReaderEndByteOffset - idReaderStartByteOffset));
+
+    int startIndex = _offsetReader.readInt(id);
+    int numValues = _offsetReader.readInt(id + 1) - startIndex;
+    int offsetReaderStartByteOffset = _offsetReader.getStartByteOffset(id);
+    int offsetReaderEndByteOffset = _offsetReader.getEndByteOffset(id + 1);
+    ranges.add(new ByteRange(_offsetBufferOffset + offsetReaderStartByteOffset,
+        offsetReaderEndByteOffset - offsetReaderStartByteOffset));
+
+    int valueReaderStartByteOffset = _valueReader.getStartByteOffset(startIndex);
+    int valueReaderEndByteOffset = _valueReader.getEndByteOffset(startIndex + numValues - 1);
+    ranges.add(new ByteRange(_valueBufferOffset + valueReaderStartByteOffset,
+        valueReaderEndByteOffset - valueReaderStartByteOffset));
+  }
+
+  @Override
+  public boolean isFixedOffsetMappingType() {
+    return false;
+  }
+
+  @Override
+  public void close() {
+    // NOTE: DO NOT close the PinotDataBuffer here because it is tracked by the caller and might be reused later. The
+    // caller is responsible of closing the PinotDataBuffer.
+    _idReader.close();
+    _offsetReader.close();
+    _valueReader.close();
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index ebea84c455..8900b6f36f 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -51,6 +51,8 @@ import org.apache.pinot.segment.spi.index.IndexService;
 import org.apache.pinot.segment.spi.index.IndexType;
 import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
 import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec;
+import org.apache.pinot.spi.config.table.FieldConfig.EncodingType;
 import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.QuotaConfig;
 import org.apache.pinot.spi.config.table.RoutingConfig;
@@ -943,8 +945,8 @@ public final class TableConfigUtils {
    * Also ensures proper dependency between index types (eg: Inverted Index columns
    * cannot be present in no-dictionary columns).
    */
-  private static void validateIndexingConfig(@Nullable IndexingConfig indexingConfig, @Nullable Schema schema) {
-    if (indexingConfig == null || schema == null) {
+  private static void validateIndexingConfig(IndexingConfig indexingConfig, @Nullable Schema schema) {
+    if (schema == null) {
       return;
     }
     ArrayListMultimap<String, String> columnNameToConfigMap = ArrayListMultimap.create();
@@ -1115,59 +1117,69 @@ public final class TableConfigUtils {
    * Validates index compatibility for forward index disabled columns
    */
   private static void validateFieldConfigList(@Nullable List<FieldConfig> fieldConfigList,
-      @Nullable IndexingConfig indexingConfigs, @Nullable Schema schema) {
-    if (fieldConfigList == null || schema == null) {
+      IndexingConfig indexingConfig, @Nullable Schema schema) {
+    if (fieldConfigList == null) {
       return;
     }
+    assert indexingConfig != null;
 
     for (FieldConfig fieldConfig : fieldConfigList) {
       String columnName = fieldConfig.getName();
-      FieldSpec fieldConfigColSpec = schema.getFieldSpecFor(columnName);
-      Preconditions.checkState(fieldConfigColSpec != null,
-          "Column Name " + columnName + " defined in field config list must be a valid column defined in the schema");
-
-      if (indexingConfigs != null) {
-        List<String> noDictionaryColumns = indexingConfigs.getNoDictionaryColumns();
-        switch (fieldConfig.getEncodingType()) {
-          case DICTIONARY:
-            if (noDictionaryColumns != null) {
-              Preconditions.checkArgument(!noDictionaryColumns.contains(columnName),
-                  "FieldConfig encoding type is different from indexingConfig for column: " + columnName);
-            }
-            Preconditions.checkArgument(fieldConfig.getCompressionCodec() == null,
-                "Set compression codec to null for dictionary encoding type");
-            break;
-          default:
-            break;
-        }
+      EncodingType encodingType = fieldConfig.getEncodingType();
+      Preconditions.checkArgument(encodingType != null, "Encoding type must be specified for column: %s", columnName);
+      CompressionCodec compressionCodec = fieldConfig.getCompressionCodec();
+      switch (encodingType) {
+        case RAW:
+          Preconditions.checkArgument(compressionCodec == null || compressionCodec.isApplicableToRawIndex(),
+              "Compression codec: %s is not applicable to raw index", compressionCodec);
+          break;
+        case DICTIONARY:
+          Preconditions.checkArgument(compressionCodec == null || compressionCodec.isApplicableToDictEncodedIndex(),
+              "Compression codec: %s is not applicable to dictionary encoded index", compressionCodec);
+          List<String> noDictionaryColumns = indexingConfig.getNoDictionaryColumns();
+          Preconditions.checkArgument(noDictionaryColumns == null || !noDictionaryColumns.contains(columnName),
+              "FieldConfig encoding type is different from indexingConfig for column: %s", columnName);
+          Map<String, String> noDictionaryConfig = indexingConfig.getNoDictionaryConfig();
+          Preconditions.checkArgument(noDictionaryConfig == null || !noDictionaryConfig.containsKey(columnName),
+              "FieldConfig encoding type is different from indexingConfig for column: %s", columnName);
+          break;
+        default:
+          break;
+      }
 
-        // Validate the forward index disabled compatibility with other indexes if enabled for this column
-        validateForwardIndexDisabledIndexCompatibility(columnName, fieldConfig, indexingConfigs, noDictionaryColumns,
-            schema);
+      if (schema == null) {
+        return;
       }
+      FieldSpec fieldSpec = schema.getFieldSpecFor(columnName);
+      Preconditions.checkState(fieldSpec != null,
+          "Column: %s defined in field config list must be a valid column defined in the schema", columnName);
+
+      // Validate the forward index disabled compatibility with other indexes if enabled for this column
+      validateForwardIndexDisabledIndexCompatibility(columnName, fieldConfig, indexingConfig, schema);
 
       if (CollectionUtils.isNotEmpty(fieldConfig.getIndexTypes())) {
         for (FieldConfig.IndexType indexType : fieldConfig.getIndexTypes()) {
           switch (indexType) {
-            case FST:
-              Preconditions.checkArgument(fieldConfig.getEncodingType() == FieldConfig.EncodingType.DICTIONARY,
-                  "FST Index is only enabled on dictionary encoded columns");
-              Preconditions.checkState(fieldConfigColSpec.isSingleValueField()
-                      && fieldConfigColSpec.getDataType().getStoredType() == DataType.STRING,
-                  "FST Index is only supported for single value string columns");
-              break;
             case INVERTED:
-              Preconditions.checkArgument(fieldConfig.getEncodingType() == FieldConfig.EncodingType.DICTIONARY,
-                  "Cannot create an Inverted Index on column: " + fieldConfig.getName() + ", specified as "
-                      + "a non dictionary column");
+              Preconditions.checkState(fieldConfig.getEncodingType() == EncodingType.DICTIONARY,
+                  "Cannot create inverted index on column: %s, it can only be applied to dictionary encoded columns",
+                  columnName);
               break;
             case TEXT:
-              Preconditions.checkState(fieldConfigColSpec.getDataType().getStoredType() == DataType.STRING,
-                  "TEXT Index is only supported for string columns");
+              Preconditions.checkState(fieldSpec.getDataType().getStoredType() == DataType.STRING,
+                  "Cannot create text index on column: %s, it can only be applied to string columns", columnName);
+              break;
+            case FST:
+              Preconditions.checkState(
+                  fieldConfig.getEncodingType() == EncodingType.DICTIONARY && fieldSpec.isSingleValueField()
+                      && fieldSpec.getDataType().getStoredType() == DataType.STRING,
+                  "Cannot create FST index on column: %s, it can only be applied to dictionary encoded single value "
+                      + "string columns", columnName);
               break;
             case TIMESTAMP:
-              Preconditions.checkState(fieldConfigColSpec.getDataType() == DataType.TIMESTAMP,
-                  "TIMESTAMP Index is only supported for timestamp columns");
+              Preconditions.checkState(fieldSpec.getDataType() == DataType.TIMESTAMP,
+                  "Cannot create timestamp index on column: %s, it can only be applied to timestamp columns",
+                  columnName);
               break;
             default:
               break;
@@ -1189,7 +1201,7 @@ public final class TableConfigUtils {
    * back or generate a new index for existing segments is to either refresh or back-fill the segments.
    */
   private static void validateForwardIndexDisabledIndexCompatibility(String columnName, FieldConfig fieldConfig,
-      IndexingConfig indexingConfigs, List<String> noDictionaryColumns, Schema schema) {
+      IndexingConfig indexingConfig, Schema schema) {
     Map<String, String> fieldConfigProperties = fieldConfig.getProperties();
     if (fieldConfigProperties == null) {
       return;
@@ -1204,26 +1216,24 @@ public final class TableConfigUtils {
 
     FieldSpec fieldSpec = schema.getFieldSpecFor(columnName);
     // Check for the range index since the index itself relies on the existence of the forward index to work.
-    if (indexingConfigs.getRangeIndexColumns() != null && indexingConfigs.getRangeIndexColumns().contains(columnName)) {
+    if (indexingConfig.getRangeIndexColumns() != null && indexingConfig.getRangeIndexColumns().contains(columnName)) {
       Preconditions.checkState(fieldSpec.isSingleValueField(), String.format("Feature not supported for multi-value "
           + "columns with range index. Cannot disable forward index for column %s. Disable range index on this "
           + "column to use this feature", columnName));
-      Preconditions.checkState(indexingConfigs.getRangeIndexVersion() == BitSlicedRangeIndexCreator.VERSION,
+      Preconditions.checkState(indexingConfig.getRangeIndexVersion() == BitSlicedRangeIndexCreator.VERSION,
           String.format("Feature not supported for single-value columns with range index version < 2. Cannot disable "
               + "forward index for column %s. Either disable range index or create range index with"
               + " version >= 2 to use this feature", columnName));
     }
 
-    Preconditions.checkState(
-        !indexingConfigs.isOptimizeDictionaryForMetrics() && !indexingConfigs.isOptimizeDictionary(), String.format(
+    Preconditions.checkState(!indexingConfig.isOptimizeDictionaryForMetrics() && !indexingConfig.isOptimizeDictionary(),
+        String.format(
             "Dictionary override optimization options (OptimizeDictionary, optimizeDictionaryForMetrics)"
                 + " not supported with forward index for column: %s, disabled", columnName));
 
-    boolean hasDictionary =
-        fieldConfig.getEncodingType() == FieldConfig.EncodingType.DICTIONARY || noDictionaryColumns == null
-            || !noDictionaryColumns.contains(columnName);
+    boolean hasDictionary = fieldConfig.getEncodingType() == EncodingType.DICTIONARY;
     boolean hasInvertedIndex =
-        indexingConfigs.getInvertedIndexColumns() != null && indexingConfigs.getInvertedIndexColumns()
+        indexingConfig.getInvertedIndexColumns() != null && indexingConfig.getInvertedIndexColumns()
             .contains(columnName);
 
     if (!hasDictionary || !hasInvertedIndex) {
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedBitMVEntryDictForwardIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedBitMVEntryDictForwardIndexTest.java
new file mode 100644
index 0000000000..12d3b2c352
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/FixedBitMVEntryDictForwardIndexTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.index.forward;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.io.writer.impl.FixedBitMVEntryDictForwardIndexWriter;
+import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVEntryDictForwardIndexReader;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class FixedBitMVEntryDictForwardIndexTest {
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "FixedBitMVEntryDictForwardIndexTest");
+  private static final File INDEX_FILE =
+      new File(TEMP_DIR, "testColumn" + V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION);
+  private static final int NUM_DOCS = 1000;
+  private static final int MAX_NUM_VALUES_PER_MV_ENTRY = 3;
+  private static final Random RANDOM = new Random();
+
+  @BeforeClass
+  public void setUp()
+      throws IOException {
+    FileUtils.forceMkdir(TEMP_DIR);
+  }
+
+  @Test
+  public void testRandomGeneratedValues()
+      throws Exception {
+    for (int numBitsPerValue = 1; numBitsPerValue <= 31; numBitsPerValue++) {
+      // Generate random values
+      int[][] valuesArray = new int[NUM_DOCS][];
+      int maxValue = numBitsPerValue != 31 ? 1 << numBitsPerValue : Integer.MAX_VALUE;
+      for (int i = 0; i < NUM_DOCS; i++) {
+        int numValues = RANDOM.nextInt(MAX_NUM_VALUES_PER_MV_ENTRY + 1);
+        int[] values = new int[numValues];
+        for (int j = 0; j < numValues; j++) {
+          values[j] = RANDOM.nextInt(maxValue);
+        }
+        valuesArray[i] = values;
+      }
+
+      // Create the forward index
+      try (
+          FixedBitMVEntryDictForwardIndexWriter writer = new FixedBitMVEntryDictForwardIndexWriter(INDEX_FILE, NUM_DOCS,
+              numBitsPerValue)) {
+        for (int[] values : valuesArray) {
+          writer.putDictIds(values);
+        }
+      }
+
+      // Read the forward index
+      try (PinotDataBuffer dataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(INDEX_FILE);
+          FixedBitMVEntryDictForwardIndexReader reader = new FixedBitMVEntryDictForwardIndexReader(dataBuffer, NUM_DOCS,
+              numBitsPerValue)) {
+        int[] valueBuffer = new int[MAX_NUM_VALUES_PER_MV_ENTRY];
+        for (int i = 0; i < NUM_DOCS; i++) {
+          int numValues = reader.getDictIdMV(i, valueBuffer, null);
+          assertEquals(numValues, valuesArray[i].length);
+          for (int j = 0; j < numValues; j++) {
+            assertEquals(valueBuffer[j], valuesArray[i][j]);
+          }
+        }
+      }
+
+      FileUtils.forceDelete(INDEX_FILE);
+    }
+  }
+
+  @Test
+  public void testAllEmptyValues()
+      throws Exception {
+    // Create the forward index
+    try (FixedBitMVEntryDictForwardIndexWriter writer = new FixedBitMVEntryDictForwardIndexWriter(INDEX_FILE, NUM_DOCS,
+        1)) {
+      int[] value = new int[0];
+      for (int i = 0; i < NUM_DOCS; i++) {
+        writer.putDictIds(value);
+      }
+    }
+
+    // Read the forward index
+    try (PinotDataBuffer dataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(INDEX_FILE);
+        FixedBitMVEntryDictForwardIndexReader reader = new FixedBitMVEntryDictForwardIndexReader(dataBuffer, NUM_DOCS,
+            1)) {
+      int[] valueBuffer = new int[0];
+      for (int i = 0; i < NUM_DOCS; i++) {
+        assertEquals(reader.getDictIdMV(i, valueBuffer, null), 0);
+      }
+    }
+
+    FileUtils.forceDelete(INDEX_FILE);
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    FileUtils.deleteDirectory(TEMP_DIR);
+  }
+}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java
index 2f438a5b6a..c72a95a4e9 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexTypeTest.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import org.apache.pinot.segment.local.segment.index.AbstractSerdeIndexContract;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.DictIdCompressionType;
 import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
 import org.apache.pinot.spi.config.table.FieldConfig;
@@ -162,29 +163,17 @@ public class ForwardIndexTypeTest {
     }
 
     @Test
-    public void oldConfEnableDictWithSnappyCompression()
+    public void oldConfEnableDictWithMVEntryDictFormat()
         throws IOException {
       addFieldIndexConfig(""
-          + " {\n"
-          + "    \"name\": \"dimInt\","
-          + "    \"encodingType\": \"DICTIONARY\",\n"
-          + "    \"compressionCodec\": \"SNAPPY\"\n"
-          + " }"
+          + "{"
+          + "  \"name\": \"dimInt\","
+          + "  \"encodingType\": \"DICTIONARY\","
+          + "  \"compressionCodec\": \"MV_ENTRY_DICT\""
+          + "}"
       );
-      assertEquals(ForwardIndexConfig.DEFAULT);
-    }
-
-    @Test
-    public void oldConfEnableDictWithLZ4Compression()
-        throws IOException {
-      addFieldIndexConfig(""
-          + " {\n"
-          + "    \"name\": \"dimInt\","
-          + "    \"encodingType\": \"DICTIONARY\",\n"
-          + "    \"compressionCodec\": \"LZ4\"\n"
-          + " }"
-      );
-      assertEquals(ForwardIndexConfig.DEFAULT);
+      assertEquals(
+          new ForwardIndexConfig.Builder().withDictIdCompressionType(DictIdCompressionType.MV_ENTRY_DICT).build());
     }
 
     @Test
@@ -197,13 +186,7 @@ public class ForwardIndexTypeTest {
                   + " }"
       );
 
-      assertEquals(
-          new ForwardIndexConfig.Builder()
-              .withCompressionType(null)
-              .withDeriveNumDocsPerChunk(false)
-              .withRawIndexWriterVersion(ForwardIndexConfig.DEFAULT_RAW_WRITER_VERSION)
-              .build()
-      );
+      assertEquals(ForwardIndexConfig.DEFAULT);
     }
 
     @Test(dataProvider = "allChunkCompressionType", dataProviderClass = ForwardIndexTypeTest.class)
@@ -269,7 +252,7 @@ public class ForwardIndexTypeTest {
     }
 
     @Test
-    public void newConfigDisabled2()
+    public void newConfigDisabled()
         throws IOException {
       addFieldIndexConfig("{\n"
               + "    \"name\": \"dimInt\",\n"
@@ -296,6 +279,23 @@ public class ForwardIndexTypeTest {
       assertEquals(ForwardIndexConfig.DEFAULT);
     }
 
+    @Test
+    public void newConfigMVEntryDictFormat()
+        throws IOException {
+      addFieldIndexConfig(""
+          + "{"
+          + "  \"name\": \"dimInt\","
+          + "  \"indexes\" : {"
+          + "    \"forward\": {"
+          + "      \"dictIdCompressionType\": \"MV_ENTRY_DICT\""
+          + "    }"
+          + "  }"
+          + "}"
+      );
+      assertEquals(
+          new ForwardIndexConfig.Builder().withDictIdCompressionType(DictIdCompressionType.MV_ENTRY_DICT).build());
+    }
+
     @Test(dataProvider = "allChunkCompressionType", dataProviderClass = ForwardIndexTypeTest.class)
     public void newConfigEnabled(String compression)
         throws IOException {
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
index 2800cf9053..6077ca1625 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
@@ -44,6 +45,7 @@ import org.apache.pinot.segment.local.segment.store.SegmentLocalFSDirectory;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.DictIdCompressionType;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.segment.spi.index.FieldIndexConfigs;
 import org.apache.pinot.segment.spi.index.IndexType;
@@ -53,6 +55,7 @@ import org.apache.pinot.segment.spi.index.reader.Dictionary;
 import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
 import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec;
 import org.apache.pinot.spi.config.table.IndexConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -70,10 +73,7 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.*;
 
 
 public class ForwardIndexHandlerTest {
@@ -104,7 +104,6 @@ public class ForwardIndexHandlerTest {
   // Sorted columns
   private static final String DIM_RAW_SORTED_INTEGER = "DIM_RAW_SORTED_INTEGER";
 
-
   // Metric columns
   private static final String METRIC_PASS_THROUGH_INTEGER = "METRIC_PASS_THROUGH_INTEGER";
   private static final String METRIC_SNAPPY_INTEGER = "METRIC_SNAPPY_INTEGER";
@@ -191,6 +190,9 @@ public class ForwardIndexHandlerTest {
       DIM_DICT_LONG, DIM_DICT_STRING, DIM_DICT_BYES, DIM_DICT_MV_BYTES, DIM_DICT_MV_STRING,
       DIM_DICT_MV_INTEGER, DIM_DICT_MV_LONG);
 
+  private static final List<String> DICT_ENABLED_MV_COLUMNS_WITH_FORWARD_INDEX =
+      Arrays.asList(DIM_DICT_MV_INTEGER, DIM_DICT_MV_LONG, DIM_DICT_MV_STRING, DIM_DICT_MV_BYTES);
+
   private static final List<String> SV_FORWARD_INDEX_DISABLED_COLUMNS = Arrays.asList(
       DIM_SV_FORWARD_INDEX_DISABLED_INTEGER, DIM_SV_FORWARD_INDEX_DISABLED_LONG, DIM_SV_FORWARD_INDEX_DISABLED_STRING,
       DIM_SV_FORWARD_INDEX_DISABLED_BYTES);
@@ -206,14 +208,16 @@ public class ForwardIndexHandlerTest {
   private static final List<String> FORWARD_INDEX_DISABLED_RAW_COLUMNS =
       Arrays.asList(DIM_RAW_SV_FORWARD_INDEX_DISABLED_INTEGER, DIM_RAW_MV_FORWARD_INDEX_DISABLED_INTEGER);
 
+  private static final List<CompressionCodec> RAW_COMPRESSION_TYPES =
+      Arrays.stream(CompressionCodec.values()).filter(CompressionCodec::isApplicableToRawIndex)
+          .collect(Collectors.toList());
+
   private final List<String> _noDictionaryColumns = new ArrayList<>();
   private final List<String> _forwardIndexDisabledColumns = new ArrayList<>();
   private final List<String> _invertedIndexColumns = new ArrayList<>();
   TableConfig _tableConfig;
   Schema _schema;
   File _segmentDirectory;
-  private List<FieldConfig.CompressionCodec> _allCompressionTypes =
-      Arrays.asList(FieldConfig.CompressionCodec.values());
 
   @BeforeMethod
   public void setUp()
@@ -242,27 +246,27 @@ public class ForwardIndexHandlerTest {
 
     for (String indexColumn : RAW_SNAPPY_INDEX_COLUMNS) {
       fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(),
-          FieldConfig.CompressionCodec.SNAPPY, null));
+          CompressionCodec.SNAPPY, null));
     }
 
     for (String indexColumn : RAW_SORTED_INDEX_COLUMNS) {
       fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW,
-          Collections.singletonList(FieldConfig.IndexType.SORTED), FieldConfig.CompressionCodec.SNAPPY, null));
+          Collections.singletonList(FieldConfig.IndexType.SORTED), CompressionCodec.SNAPPY, null));
     }
 
     for (String indexColumn : RAW_ZSTANDARD_INDEX_COLUMNS) {
       fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(),
-          FieldConfig.CompressionCodec.ZSTANDARD, null));
+          CompressionCodec.ZSTANDARD, null));
     }
 
     for (String indexColumn : RAW_PASS_THROUGH_INDEX_COLUMNS) {
       fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(),
-          FieldConfig.CompressionCodec.PASS_THROUGH, null));
+          CompressionCodec.PASS_THROUGH, null));
     }
 
     for (String indexColumn : RAW_LZ4_INDEX_COLUMNS) {
       fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(),
-          FieldConfig.CompressionCodec.LZ4, null));
+          CompressionCodec.LZ4, null));
     }
 
     for (String indexColumn : SV_FORWARD_INDEX_DISABLED_COLUMNS) {
@@ -284,8 +288,8 @@ public class ForwardIndexHandlerTest {
     }
 
     for (String indexColumn : FORWARD_INDEX_DISABLED_RAW_COLUMNS) {
-      fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW,
-          Collections.emptyList(), FieldConfig.CompressionCodec.LZ4,
+      fieldConfigs.add(
+          new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(), CompressionCodec.LZ4,
           Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString())));
     }
 
@@ -696,8 +700,9 @@ public class ForwardIndexHandlerTest {
         || DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX.equals(name)
         || DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX.equals(name));
     FieldConfig config = fieldConfigs.remove(randIdx);
-    FieldConfig.CompressionCodec newCompressionType = null;
-    for (FieldConfig.CompressionCodec type : _allCompressionTypes) {
+    CompressionCodec newCompressionType = null;
+    for (CompressionCodec type : CompressionCodec.values()) {
+
       if (config.getCompressionCodec() != type) {
         newCompressionType = type;
         break;
@@ -717,7 +722,7 @@ public class ForwardIndexHandlerTest {
     Map<String, List<ForwardIndexHandler.Operation>> operationMap = fwdIndexHandler.computeOperations(writer);
     assertEquals(operationMap.size(), 1);
     assertEquals(operationMap.get(config.getName()),
-        Collections.singletonList(ForwardIndexHandler.Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE));
+        Collections.singletonList(ForwardIndexHandler.Operation.CHANGE_INDEX_COMPRESSION_TYPE));
 
     // TEST2: Change compression and add index. Change compressionType for more than 1 column.
     fieldConfigs = new ArrayList<>(_tableConfig.getFieldConfigList());
@@ -725,10 +730,10 @@ public class ForwardIndexHandlerTest {
     FieldConfig config2 = fieldConfigs.remove(1);
 
     FieldConfig newConfig1 = new FieldConfig(config1.getName(), FieldConfig.EncodingType.RAW, Collections.emptyList(),
-        FieldConfig.CompressionCodec.ZSTANDARD, null);
+        CompressionCodec.ZSTANDARD, null);
     fieldConfigs.add(newConfig1);
     FieldConfig newConfig2 = new FieldConfig(config2.getName(), FieldConfig.EncodingType.RAW, Collections.emptyList(),
-        FieldConfig.CompressionCodec.ZSTANDARD, null);
+        CompressionCodec.ZSTANDARD, null);
     fieldConfigs.add(newConfig2);
 
     tableConfig =
@@ -743,9 +748,9 @@ public class ForwardIndexHandlerTest {
     operationMap = fwdIndexHandler.computeOperations(writer);
     assertEquals(operationMap.size(), 2);
     assertEquals(operationMap.get(config1.getName()),
-        Collections.singletonList(ForwardIndexHandler.Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE));
+        Collections.singletonList(ForwardIndexHandler.Operation.CHANGE_INDEX_COMPRESSION_TYPE));
     assertEquals(operationMap.get(config2.getName()),
-        Collections.singletonList(ForwardIndexHandler.Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE));
+        Collections.singletonList(ForwardIndexHandler.Operation.CHANGE_INDEX_COMPRESSION_TYPE));
 
     // Tear down
     segmentLocalFSDirectory.close();
@@ -963,8 +968,8 @@ public class ForwardIndexHandlerTest {
       name = fieldConfigs.get(randIdx).getName();
     } while (!SV_FORWARD_INDEX_DISABLED_COLUMNS.contains(name) && !MV_FORWARD_INDEX_DISABLED_COLUMNS.contains(name));
     FieldConfig config = fieldConfigs.remove(randIdx);
-    FieldConfig.CompressionCodec newCompressionType = null;
-    for (FieldConfig.CompressionCodec type : _allCompressionTypes) {
+    CompressionCodec newCompressionType = null;
+    for (CompressionCodec type : RAW_COMPRESSION_TYPES) {
       if (config.getCompressionCodec() != type) {
         newCompressionType = type;
         break;
@@ -1083,7 +1088,7 @@ public class ForwardIndexHandlerTest {
         continue;
       }
       // For every noDictionaryColumn, change the compressionType to all available types, one by one.
-      for (FieldConfig.CompressionCodec compressionType : _allCompressionTypes) {
+      for (CompressionCodec compressionType : RAW_COMPRESSION_TYPES) {
         // Setup
         SegmentMetadataImpl existingSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
         SegmentDirectory segmentLocalFSDirectory =
@@ -1100,10 +1105,9 @@ public class ForwardIndexHandlerTest {
         }
         FieldConfig config = fieldConfigs.remove(index);
         String columnName = config.getName();
-        FieldConfig.CompressionCodec newCompressionType = compressionType;
 
         FieldConfig newConfig =
-            new FieldConfig(columnName, FieldConfig.EncodingType.RAW, Collections.emptyList(), newCompressionType,
+            new FieldConfig(columnName, FieldConfig.EncodingType.RAW, Collections.emptyList(), compressionType,
                 null);
         fieldConfigs.add(newConfig);
 
@@ -1125,7 +1129,7 @@ public class ForwardIndexHandlerTest {
         ColumnMetadata metadata = existingSegmentMetadata.getColumnMetadataFor(columnName);
         testIndexExists(columnName, StandardIndexes.forward());
         validateIndexMap(columnName, false, false);
-        validateForwardIndex(columnName, newCompressionType, metadata.isSorted());
+        validateForwardIndex(columnName, compressionType, metadata.isSorted());
 
         // Validate metadata properties. Nothing should change when a forwardIndex is rewritten for compressionType
         // change.
@@ -1138,6 +1142,79 @@ public class ForwardIndexHandlerTest {
     }
   }
 
+  @Test
+  public void testChangeDictCompression()
+      throws Exception {
+    List<FieldConfig> fieldConfigs = new ArrayList<>(_tableConfig.getFieldConfigList());
+
+    // Change to MV_ENTRY_DICT compression
+    for (String column : DICT_ENABLED_MV_COLUMNS_WITH_FORWARD_INDEX) {
+      FieldConfig newFieldConfig = new FieldConfig(column, FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(),
+          CompressionCodec.MV_ENTRY_DICT, null);
+      fieldConfigs.add(newFieldConfig);
+      TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+          .setNoDictionaryColumns(_noDictionaryColumns).setFieldConfigList(fieldConfigs).build();
+
+      SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
+      try (SegmentDirectory segmentLocalFSDirectory = new SegmentLocalFSDirectory(_segmentDirectory, segmentMetadata,
+          ReadMode.mmap); SegmentDirectory.Writer writer = segmentLocalFSDirectory.createWriter()) {
+        ForwardIndexHandler forwardIndexHandler =
+            new ForwardIndexHandler(segmentLocalFSDirectory, new IndexLoadingConfig(tableConfig, null), null);
+
+        Map<String, List<ForwardIndexHandler.Operation>> operations = forwardIndexHandler.computeOperations(writer);
+        assertEquals(operations, Collections.singletonMap(column,
+            Collections.singletonList(ForwardIndexHandler.Operation.CHANGE_INDEX_COMPRESSION_TYPE)));
+        assertTrue(forwardIndexHandler.needUpdateIndices(writer));
+
+        forwardIndexHandler.updateIndices(writer);
+        forwardIndexHandler.postUpdateIndicesCleanup(writer);
+      }
+
+      segmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
+      try (SegmentDirectory segmentLocalFSDirectory = new SegmentLocalFSDirectory(_segmentDirectory, segmentMetadata,
+          ReadMode.mmap); SegmentDirectory.Reader reader = segmentLocalFSDirectory.createReader()) {
+        ForwardIndexReader<?> forwardIndexReader =
+            ForwardIndexType.read(reader, segmentMetadata.getColumnMetadataFor(column));
+        assertTrue(forwardIndexReader.isDictionaryEncoded());
+        assertFalse(forwardIndexReader.isSingleValue());
+        assertEquals(forwardIndexReader.getDictIdCompressionType(), DictIdCompressionType.MV_ENTRY_DICT);
+      }
+    }
+
+    // Change back to regular forward index
+    for (int i = 0; i < DICT_ENABLED_MV_COLUMNS_WITH_FORWARD_INDEX.size(); i++) {
+      FieldConfig fieldConfig = fieldConfigs.remove(fieldConfigs.size() - 1);
+      String column = fieldConfig.getName();
+      TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+          .setNoDictionaryColumns(_noDictionaryColumns).setFieldConfigList(fieldConfigs).build();
+
+      SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
+      try (SegmentDirectory segmentLocalFSDirectory = new SegmentLocalFSDirectory(_segmentDirectory, segmentMetadata,
+          ReadMode.mmap); SegmentDirectory.Writer writer = segmentLocalFSDirectory.createWriter()) {
+        ForwardIndexHandler forwardIndexHandler =
+            new ForwardIndexHandler(segmentLocalFSDirectory, new IndexLoadingConfig(tableConfig, null), null);
+
+        Map<String, List<ForwardIndexHandler.Operation>> operations = forwardIndexHandler.computeOperations(writer);
+        assertEquals(operations, Collections.singletonMap(column,
+            Collections.singletonList(ForwardIndexHandler.Operation.CHANGE_INDEX_COMPRESSION_TYPE)));
+        assertTrue(forwardIndexHandler.needUpdateIndices(writer));
+
+        forwardIndexHandler.updateIndices(writer);
+        forwardIndexHandler.postUpdateIndicesCleanup(writer);
+      }
+
+      segmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
+      try (SegmentDirectory segmentLocalFSDirectory = new SegmentLocalFSDirectory(_segmentDirectory, segmentMetadata,
+          ReadMode.mmap); SegmentDirectory.Reader reader = segmentLocalFSDirectory.createReader()) {
+        ForwardIndexReader<?> forwardIndexReader =
+            ForwardIndexType.read(reader, segmentMetadata.getColumnMetadataFor(column));
+        assertTrue(forwardIndexReader.isDictionaryEncoded());
+        assertFalse(forwardIndexReader.isSingleValue());
+        assertNull(forwardIndexReader.getDictIdCompressionType());
+      }
+    }
+  }
+
   @Test
   public void testChangeCompressionForMultipleColumns()
       throws Exception {
@@ -1149,8 +1226,8 @@ public class ForwardIndexHandlerTest {
 
     List<FieldConfig> fieldConfigs = new ArrayList<>(_tableConfig.getFieldConfigList());
     Random rand = new Random();
-    int randomIdx = rand.nextInt(_allCompressionTypes.size());
-    FieldConfig.CompressionCodec newCompressionType = _allCompressionTypes.get(randomIdx);
+    int randomIdx = rand.nextInt(RAW_COMPRESSION_TYPES.size());
+    CompressionCodec newCompressionType = RAW_COMPRESSION_TYPES.get(randomIdx);
 
     // Column 1
     String name;
@@ -1497,7 +1574,7 @@ public class ForwardIndexHandlerTest {
       testIndexExists(column, StandardIndexes.forward());
       validateIndexMap(column, false, false);
       // All the columns are dimensions. So default compression type is LZ4.
-      validateForwardIndex(column, FieldConfig.CompressionCodec.LZ4, metadata.isSorted());
+      validateForwardIndex(column, CompressionCodec.LZ4, metadata.isSorted());
 
       // In column metadata, nothing other than hasDictionary and dictionaryElementSize should change.
       validateMetadataProperties(column, false, 0, metadata.getCardinality(), metadata.getTotalDocs(),
@@ -1538,7 +1615,7 @@ public class ForwardIndexHandlerTest {
     testIndexExists(column1, StandardIndexes.forward());
     validateIndexMap(column1, false, false);
     // All the columns are dimensions. So default compression type is LZ4.
-    validateForwardIndex(column1, FieldConfig.CompressionCodec.LZ4, metadata.isSorted());
+    validateForwardIndex(column1, CompressionCodec.LZ4, metadata.isSorted());
 
     // In column metadata, nothing other than hasDictionary and dictionaryElementSize should change.
     validateMetadataProperties(column1, false, 0, metadata.getCardinality(), metadata.getTotalDocs(),
@@ -1551,7 +1628,7 @@ public class ForwardIndexHandlerTest {
     testIndexExists(column2, StandardIndexes.forward());
     validateIndexMap(column2, false, false);
     // All the columns are dimensions. So default compression type is LZ4.
-    validateForwardIndex(column2, FieldConfig.CompressionCodec.LZ4, metadata.isSorted());
+    validateForwardIndex(column2, CompressionCodec.LZ4, metadata.isSorted());
 
     // In column metadata, nothing other than hasDictionary and dictionaryElementSize should change.
     validateMetadataProperties(column2, false, 0, metadata.getCardinality(), metadata.getTotalDocs(),
@@ -1940,7 +2017,7 @@ public class ForwardIndexHandlerTest {
     // Col1 validation.
     ColumnMetadata metadata = existingSegmentMetadata.getColumnMetadataFor(col1);
     validateIndexMap(col1, false, false);
-    validateForwardIndex(col1, FieldConfig.CompressionCodec.LZ4, metadata.isSorted());
+    validateForwardIndex(col1, CompressionCodec.LZ4, metadata.isSorted());
     // In column metadata, nothing should change.
     validateMetadataProperties(col1, false, 0, metadata.getCardinality(),
         metadata.getTotalDocs(), metadata.getDataType(), metadata.getFieldType(), metadata.isSorted(),
@@ -1950,7 +2027,7 @@ public class ForwardIndexHandlerTest {
     // Col2 validation.
     metadata = existingSegmentMetadata.getColumnMetadataFor(col2);
     validateIndexMap(col2, false, false);
-    validateForwardIndex(col2, FieldConfig.CompressionCodec.LZ4, metadata.isSorted());
+    validateForwardIndex(col2, CompressionCodec.LZ4, metadata.isSorted());
     // In column metadata, nothing should change.
     validateMetadataProperties(col2, false, 0, metadata.getCardinality(),
         metadata.getTotalDocs(), metadata.getDataType(), metadata.getFieldType(), metadata.isSorted(),
@@ -1985,7 +2062,7 @@ public class ForwardIndexHandlerTest {
     // Column validation.
     ColumnMetadata metadata = existingSegmentMetadata.getColumnMetadataFor(column);
     validateIndexMap(column, false, false);
-    validateForwardIndex(column, FieldConfig.CompressionCodec.LZ4, metadata.isSorted());
+    validateForwardIndex(column, CompressionCodec.LZ4, metadata.isSorted());
     // In column metadata, some values can change since MV columns with duplicates lose the duplicates on forward index
     // regeneration.
     validateMetadataProperties(column, false, 0, metadata.getCardinality(),
@@ -2030,7 +2107,7 @@ public class ForwardIndexHandlerTest {
 
       ColumnMetadata metadata = existingSegmentMetadata.getColumnMetadataFor(column);
       validateIndexMap(column, false, false);
-      validateForwardIndex(column, FieldConfig.CompressionCodec.LZ4, metadata.isSorted());
+      validateForwardIndex(column, CompressionCodec.LZ4, metadata.isSorted());
 
       // In column metadata, nothing should change.
       validateMetadataProperties(column, false, 0,
@@ -2308,7 +2385,7 @@ public class ForwardIndexHandlerTest {
     }
   }
 
-  private void validateForwardIndex(String columnName, @Nullable FieldConfig.CompressionCodec expectedCompressionType,
+  private void validateForwardIndex(String columnName, @Nullable CompressionCodec expectedCompressionType,
       boolean isSorted)
       throws IOException {
     // Setup
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
index 8e549d2ba3..3cc14682bc 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
@@ -62,6 +62,7 @@ import org.apache.pinot.segment.spi.store.SegmentDirectory;
 import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
 import org.apache.pinot.segment.spi.utils.SegmentMetadataUtils;
 import org.apache.pinot.spi.config.table.BloomFilterConfig;
+import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec;
 import org.apache.pinot.spi.config.table.IndexConfig;
 import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
@@ -656,9 +657,8 @@ public class SegmentPreProcessorTest {
   @Test
   public void testForwardIndexHandlerChangeCompression()
       throws Exception {
-    Map<String, ChunkCompressionType> compressionConfigs = new HashMap<>();
-    ChunkCompressionType newCompressionType = ChunkCompressionType.ZSTANDARD;
-    compressionConfigs.put(EXISTING_STRING_COL_RAW, newCompressionType);
+    Map<String, CompressionCodec> compressionConfigs = new HashMap<>();
+    compressionConfigs.put(EXISTING_STRING_COL_RAW, CompressionCodec.ZSTANDARD);
     _indexLoadingConfig.setCompressionConfigs(compressionConfigs);
     _indexLoadingConfig.addNoDictionaryColumns(EXISTING_STRING_COL_RAW);
 
@@ -672,12 +672,11 @@ public class SegmentPreProcessorTest {
     new SegmentV1V2ToV3FormatConverter().convert(_indexDir);
 
     // Test2: Now forward index will be rewritten with ZSTANDARD compressionType.
-    checkForwardIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, newCompressionType, true,
-        0, DataType.STRING, 100000);
+    checkForwardIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0,
+        ChunkCompressionType.ZSTANDARD, true, 0, DataType.STRING, 100000);
 
     // Test3: Change compression on existing raw index column. Also add text index on same column. Check correctness.
-    newCompressionType = ChunkCompressionType.SNAPPY;
-    compressionConfigs.put(EXISTING_STRING_COL_RAW, newCompressionType);
+    compressionConfigs.put(EXISTING_STRING_COL_RAW, CompressionCodec.SNAPPY);
     _indexLoadingConfig.setCompressionConfigs(compressionConfigs);
     Set<String> textIndexColumns = new HashSet<>();
     textIndexColumns.add(EXISTING_STRING_COL_RAW);
@@ -688,12 +687,11 @@ public class SegmentPreProcessorTest {
     ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(EXISTING_STRING_COL_RAW);
     assertNotNull(columnMetadata);
     checkTextIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0);
-    validateIndex(StandardIndexes.forward(), EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, true,
-        0, newCompressionType, false, DataType.STRING, 100000);
+    validateIndex(StandardIndexes.forward(), EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, true, 0,
+        ChunkCompressionType.SNAPPY, false, DataType.STRING, 100000);
 
     // Test4: Change compression on RAW index column. Change another index on another column. Check correctness.
-    newCompressionType = ChunkCompressionType.ZSTANDARD;
-    compressionConfigs.put(EXISTING_STRING_COL_RAW, newCompressionType);
+    compressionConfigs.put(EXISTING_STRING_COL_RAW, CompressionCodec.ZSTANDARD);
     _indexLoadingConfig.setCompressionConfigs(compressionConfigs);
     Set<String> fstColumns = new HashSet<>();
     fstColumns.add(EXISTING_STRING_COL_DICT);
@@ -706,12 +704,11 @@ public class SegmentPreProcessorTest {
     // Check FST index
     checkFSTIndexCreation(EXISTING_STRING_COL_DICT, 9, 4, _newColumnsSchemaWithFST, false, false, 26);
     // Check forward index.
-    validateIndex(StandardIndexes.forward(), EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, true,
-        0, newCompressionType, false, DataType.STRING, 100000);
+    validateIndex(StandardIndexes.forward(), EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, true, 0,
+        ChunkCompressionType.ZSTANDARD, false, DataType.STRING, 100000);
 
     // Test5: Change compressionType for an MV column
-    newCompressionType = ChunkCompressionType.ZSTANDARD;
-    compressionConfigs.put(EXISTING_INT_COL_RAW_MV, newCompressionType);
+    compressionConfigs.put(EXISTING_INT_COL_RAW_MV, CompressionCodec.ZSTANDARD);
     _indexLoadingConfig.setCompressionConfigs(compressionConfigs);
     _indexLoadingConfig.addNoDictionaryColumns(EXISTING_INT_COL_RAW_MV);
 
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 7653ff7636..9c9b2cdf43 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -31,6 +31,7 @@ import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.DedupConfig;
 import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec;
 import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
 import org.apache.pinot.spi.config.table.RoutingConfig;
@@ -706,9 +707,8 @@ public class TableConfigUtilsTest {
     streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE, "100m");
     streamConfigs.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS);
     ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(List.of(streamConfigs)));
-    tableConfig =
-        new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
-            .setIngestionConfig(ingestionConfig).build();
+    tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
+        .setIngestionConfig(ingestionConfig).build();
 
     try {
       TableConfigUtils.validate(tableConfig, schema);
@@ -720,9 +720,8 @@ public class TableConfigUtilsTest {
     // When size based threshold is specified, rows has to be set to 0.
     streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, "1000");
     ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(List.of(streamConfigs)));
-    tableConfig =
-        new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn")
-            .setIngestionConfig(ingestionConfig).build();
+    tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn")
+        .setIngestionConfig(ingestionConfig).build();
 
     try {
       TableConfigUtils.validate(tableConfig, schema);
@@ -734,9 +733,8 @@ public class TableConfigUtilsTest {
     // When size based threshold is specified, rows has to be set to 0.
     streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, "0");
     ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(List.of(streamConfigs)));
-    tableConfig =
-        new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn")
-            .setIngestionConfig(ingestionConfig).build();
+    tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn")
+        .setIngestionConfig(ingestionConfig).build();
 
     try {
       TableConfigUtils.validate(tableConfig, schema);
@@ -1030,7 +1028,9 @@ public class TableConfigUtilsTest {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail since FST index is enabled on RAW encoding type");
     } catch (Exception e) {
-      Assert.assertEquals(e.getMessage(), "FST Index is only enabled on dictionary encoded columns");
+      Assert.assertEquals(e.getMessage(),
+          "Cannot create FST index on column: myCol1, it can only be applied to dictionary encoded single value "
+              + "string columns");
     }
 
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
@@ -1041,7 +1041,9 @@ public class TableConfigUtilsTest {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail since FST index is enabled on multi value column");
     } catch (Exception e) {
-      Assert.assertEquals(e.getMessage(), "FST Index is only supported for single value string columns");
+      Assert.assertEquals(e.getMessage(),
+          "Cannot create FST index on column: myCol2, it can only be applied to dictionary encoded single value "
+              + "string columns");
     }
 
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
@@ -1052,7 +1054,9 @@ public class TableConfigUtilsTest {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail since FST index is enabled on non String column");
     } catch (Exception e) {
-      Assert.assertEquals(e.getMessage(), "FST Index is only supported for single value string columns");
+      Assert.assertEquals(e.getMessage(),
+          "Cannot create FST index on column: intCol, it can only be applied to dictionary encoded single value "
+              + "string columns");
     }
 
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
@@ -1064,7 +1068,8 @@ public class TableConfigUtilsTest {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail since TEXT index is enabled on non String column");
     } catch (Exception e) {
-      Assert.assertEquals(e.getMessage(), "TEXT Index is only supported for string columns");
+      Assert.assertEquals(e.getMessage(),
+          "Cannot create text index on column: intCol, it can only be applied to string columns");
     }
 
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
@@ -1077,29 +1082,29 @@ public class TableConfigUtilsTest {
       Assert.fail("Should fail since field name is not present in schema");
     } catch (Exception e) {
       Assert.assertEquals(e.getMessage(),
-          "Column Name myCol21 defined in field config list must be a valid column defined in the schema");
+          "Column: myCol21 defined in field config list must be a valid column defined in the schema");
     }
 
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
     try {
       FieldConfig fieldConfig = new FieldConfig("intCol", FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(),
-          FieldConfig.CompressionCodec.SNAPPY, null);
+          CompressionCodec.SNAPPY, null);
       tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
       TableConfigUtils.validate(tableConfig, schema);
-      Assert.fail("Should fail since dictionary encoding does not support compression codec snappy");
+      Assert.fail("Should fail since dictionary encoding does not support compression codec SNAPPY");
     } catch (Exception e) {
-      Assert.assertEquals(e.getMessage(), "Set compression codec to null for dictionary encoding type");
+      Assert.assertEquals(e.getMessage(), "Compression codec: SNAPPY is not applicable to dictionary encoded index");
     }
 
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
     try {
-      FieldConfig fieldConfig = new FieldConfig("intCol", FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(),
-          FieldConfig.CompressionCodec.ZSTANDARD, null);
+      FieldConfig fieldConfig = new FieldConfig("intCol", FieldConfig.EncodingType.RAW, Collections.emptyList(),
+          CompressionCodec.MV_ENTRY_DICT, null);
       tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
       TableConfigUtils.validate(tableConfig, schema);
-      Assert.fail("Should fail since dictionary encoding does not support compression codec zstandard");
+      Assert.fail("Should fail since raw encoding does not support compression codec MV_ENTRY_DICT");
     } catch (Exception e) {
-      Assert.assertEquals(e.getMessage(), "Set compression codec to null for dictionary encoding type");
+      Assert.assertEquals(e.getMessage(), "Compression codec: MV_ENTRY_DICT is not applicable to raw index");
     }
 
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
@@ -1227,7 +1232,7 @@ public class TableConfigUtilsTest {
       Assert.fail("Should not be able to disable dictionary but keep inverted index");
     } catch (Exception e) {
       Assert.assertEquals(e.getMessage(),
-          "Cannot create an Inverted index on column myCol2 specified in the " + "noDictionaryColumns config");
+          "Cannot create an Inverted index on column myCol2 specified in the noDictionaryColumns config");
     }
 
     // Tests the case when the field-config list marks a column as raw (non-dictionary) and enables
@@ -1242,7 +1247,7 @@ public class TableConfigUtilsTest {
       Assert.fail("Should not be able to disable dictionary but keep inverted index");
     } catch (Exception e) {
       Assert.assertEquals(e.getMessage(),
-          "Cannot create an Inverted Index on column: myCol2, specified as a non dictionary column");
+          "Cannot create inverted index on column: myCol2, it can only be applied to dictionary encoded columns");
     }
 
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
@@ -1258,7 +1263,9 @@ public class TableConfigUtilsTest {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should not be able to disable dictionary but keep inverted index");
     } catch (Exception e) {
-      Assert.assertEquals(e.getMessage(), "FST Index is only enabled on dictionary encoded columns");
+      Assert.assertEquals(e.getMessage(),
+          "Cannot create FST index on column: myCol2, it can only be applied to dictionary encoded single value "
+              + "string columns");
     }
 
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
@@ -1433,7 +1440,7 @@ public class TableConfigUtilsTest {
     }
 
     starTreeIndexConfig = new StarTreeIndexConfig(Arrays.asList("myCol"), null, null,
-        Arrays.asList(new StarTreeAggregationConfig("myCol2", "SUM", FieldConfig.CompressionCodec.LZ4)), 1);
+        Arrays.asList(new StarTreeAggregationConfig("myCol2", "SUM", CompressionCodec.LZ4)), 1);
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
         .setStarTreeIndexConfigs(Arrays.asList(starTreeIndexConfig)).build();
     try {
@@ -1798,8 +1805,7 @@ public class TableConfigUtilsTest {
     try {
       TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
     } catch (IllegalStateException e) {
-      Assert.assertEquals(e.getMessage(),
-          "The outOfOrderRecordColumn must be a single-valued BOOLEAN column");
+      Assert.assertEquals(e.getMessage(), "The outOfOrderRecordColumn must be a single-valued BOOLEAN column");
     }
   }
 
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/DictIdCompressionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/DictIdCompressionType.java
new file mode 100644
index 0000000000..82a60ed869
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/compression/DictIdCompressionType.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.compression;
+
+/**
+ * Compression type for dictionary-encoded forward index, where the values stored are dictionary ids.
+ */
+public enum DictIdCompressionType {
+  // Add a second level dictionary encoding for the multi-value entries
+  MV_ENTRY_DICT(false, true);
+
+  private final boolean _applicableToSV;
+  private final boolean _applicableToMV;
+
+  DictIdCompressionType(boolean applicableToSV, boolean applicableToMV) {
+    _applicableToSV = applicableToSV;
+    _applicableToMV = applicableToMV;
+  }
+
+  public boolean isApplicableToSV() {
+    return _applicableToSV;
+  }
+
+  public boolean isApplicableToMV() {
+    return _applicableToMV;
+  }
+
+  public boolean isApplicable(boolean isSingleValue) {
+    return isSingleValue ? isApplicableToSV() : isApplicableToMV();
+  }
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
index 445dcc0b71..fcdbbe4fe0 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/ForwardIndexConfig.java
@@ -21,19 +21,19 @@ package org.apache.pinot.segment.spi.index;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import java.io.IOException;
 import java.util.Map;
 import java.util.Objects;
 import javax.annotation.Nullable;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.DictIdCompressionType;
 import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec;
 import org.apache.pinot.spi.config.table.IndexConfig;
-import org.apache.pinot.spi.utils.JsonUtils;
 
 
 public class ForwardIndexConfig extends IndexConfig {
   public static final int DEFAULT_RAW_WRITER_VERSION = 2;
-  public static final ForwardIndexConfig DISABLED = new ForwardIndexConfig(true, null, null, null);
+  public static final ForwardIndexConfig DISABLED = new ForwardIndexConfig(true, null, null, null, null);
   public static final ForwardIndexConfig DEFAULT = new Builder().build();
 
   @Nullable
@@ -41,15 +41,20 @@ public class ForwardIndexConfig extends IndexConfig {
   private final boolean _deriveNumDocsPerChunk;
   private final int _rawIndexWriterVersion;
 
+  @Nullable
+  private final DictIdCompressionType _dictIdCompressionType;
+
   @JsonCreator
-  public ForwardIndexConfig(@Nullable @JsonProperty("disabled") Boolean disabled,
-      @Nullable @JsonProperty("chunkCompressionType") ChunkCompressionType chunkCompressionType,
+  public ForwardIndexConfig(@JsonProperty("disabled") @Nullable Boolean disabled,
+      @JsonProperty("chunkCompressionType") @Nullable ChunkCompressionType chunkCompressionType,
       @JsonProperty("deriveNumDocsPerChunk") Boolean deriveNumDocsPerChunk,
-      @JsonProperty("rawIndexWriterVersion") Integer rawIndexWriterVersion) {
+      @JsonProperty("rawIndexWriterVersion") Integer rawIndexWriterVersion,
+      @JsonProperty("dictIdCompressionType") @Nullable DictIdCompressionType dictIdCompressionType) {
     super(disabled);
     _chunkCompressionType = chunkCompressionType;
     _deriveNumDocsPerChunk = deriveNumDocsPerChunk != null && deriveNumDocsPerChunk;
     _rawIndexWriterVersion = rawIndexWriterVersion == null ? DEFAULT_RAW_WRITER_VERSION : rawIndexWriterVersion;
+    _dictIdCompressionType = dictIdCompressionType;
   }
 
   @Nullable
@@ -65,12 +70,17 @@ public class ForwardIndexConfig extends IndexConfig {
     return _rawIndexWriterVersion;
   }
 
+  @Nullable
+  public DictIdCompressionType getDictIdCompressionType() {
+    return _dictIdCompressionType;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
       return true;
     }
-    if (o == null || getClass() != o.getClass()) {
+    if (!(o instanceof ForwardIndexConfig)) {
       return false;
     }
     if (!super.equals(o)) {
@@ -78,12 +88,14 @@ public class ForwardIndexConfig extends IndexConfig {
     }
     ForwardIndexConfig that = (ForwardIndexConfig) o;
     return _deriveNumDocsPerChunk == that._deriveNumDocsPerChunk
-        && _rawIndexWriterVersion == that._rawIndexWriterVersion && _chunkCompressionType == that._chunkCompressionType;
+        && _rawIndexWriterVersion == that._rawIndexWriterVersion && _chunkCompressionType == that._chunkCompressionType
+        && Objects.equals(_dictIdCompressionType, that._dictIdCompressionType);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), _chunkCompressionType, _deriveNumDocsPerChunk, _rawIndexWriterVersion);
+    return Objects.hash(super.hashCode(), _chunkCompressionType, _deriveNumDocsPerChunk, _rawIndexWriterVersion,
+        _dictIdCompressionType);
   }
 
   public static class Builder {
@@ -92,6 +104,9 @@ public class ForwardIndexConfig extends IndexConfig {
     private boolean _deriveNumDocsPerChunk = false;
     private int _rawIndexWriterVersion = DEFAULT_RAW_WRITER_VERSION;
 
+    @Nullable
+    private DictIdCompressionType _dictIdCompressionType;
+
     public Builder() {
     }
 
@@ -99,6 +114,7 @@ public class ForwardIndexConfig extends IndexConfig {
       _chunkCompressionType = other.getChunkCompressionType();
       _deriveNumDocsPerChunk = other._deriveNumDocsPerChunk;
       _rawIndexWriterVersion = other._rawIndexWriterVersion;
+      _dictIdCompressionType = other._dictIdCompressionType;
     }
 
     public Builder withCompressionType(ChunkCompressionType chunkCompressionType) {
@@ -116,6 +132,39 @@ public class ForwardIndexConfig extends IndexConfig {
       return this;
     }
 
+    public Builder withDictIdCompressionType(DictIdCompressionType dictIdCompressionType) {
+      _dictIdCompressionType = dictIdCompressionType;
+      return this;
+    }
+
+    public Builder withCompressionCodec(CompressionCodec compressionCodec) {
+      if (compressionCodec == null) {
+        _chunkCompressionType = null;
+        _dictIdCompressionType = null;
+        return this;
+      }
+      switch (compressionCodec) {
+        case PASS_THROUGH:
+          _chunkCompressionType = ChunkCompressionType.PASS_THROUGH;
+          break;
+        case SNAPPY:
+          _chunkCompressionType = ChunkCompressionType.SNAPPY;
+          break;
+        case ZSTANDARD:
+          _chunkCompressionType = ChunkCompressionType.ZSTANDARD;
+          break;
+        case LZ4:
+          _chunkCompressionType = ChunkCompressionType.LZ4;
+          break;
+        case MV_ENTRY_DICT:
+          _dictIdCompressionType = DictIdCompressionType.MV_ENTRY_DICT;
+          break;
+        default:
+          throw new IllegalStateException("Unsupported compression codec: " + compressionCodec);
+      }
+      return this;
+    }
+
     public Builder withLegacyProperties(Map<String, Map<String, String>> propertiesByCol, String colName) {
       if (propertiesByCol != null) {
         Map<String, String> colProps = propertiesByCol.get(colName);
@@ -139,18 +188,8 @@ public class ForwardIndexConfig extends IndexConfig {
     }
 
     public ForwardIndexConfig build() {
-      return new ForwardIndexConfig(false, _chunkCompressionType, _deriveNumDocsPerChunk, _rawIndexWriterVersion);
-    }
-  }
-
-  @Override
-  public String toString() {
-    try {
-      return JsonUtils.objectToString(this);
-    } catch (IOException ex) {
-      return "{" + "\"chunkCompressionType\":" + _chunkCompressionType
-          + ", \"deriveNumDocsPerChunk\":" + _deriveNumDocsPerChunk
-          + ", \"rawIndexWriterVersion\":" + _rawIndexWriterVersion + '}';
+      return new ForwardIndexConfig(false, _chunkCompressionType, _deriveNumDocsPerChunk, _rawIndexWriterVersion,
+          _dictIdCompressionType);
     }
   }
 }
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
index 5882986edd..e067376472 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
@@ -25,6 +25,7 @@ import lombok.AllArgsConstructor;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
+import org.apache.pinot.segment.spi.compression.DictIdCompressionType;
 import org.apache.pinot.segment.spi.index.IndexReader;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.utils.BigDecimalUtils;
@@ -57,12 +58,20 @@ public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends
   /**
    * Returns the compression type (if valid). Only valid for RAW forward index columns implemented in
    * BaseChunkForwardIndexReader.
-   * @return
    */
+  @Nullable
   default ChunkCompressionType getCompressionType() {
     return null;
   }
 
+  /**
+   * Returns the compression type for dictionary encoded forward index.
+   */
+  @Nullable
+  default DictIdCompressionType getDictIdCompressionType() {
+    return null;
+  }
+
   /**
    * Returns the length of the longest entry. Only valid for RAW forward index columns implemented in
    * BaseChunkForwardIndexReader. Returns -1 otherwise.
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
index 30a42189a4..47a846d878 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
@@ -119,7 +119,29 @@ public class FieldConfig extends BaseJsonConfig {
   }
 
   public enum CompressionCodec {
-    PASS_THROUGH, SNAPPY, ZSTANDARD, LZ4
+    PASS_THROUGH(true, false),
+    SNAPPY(true, false),
+    ZSTANDARD(true, false),
+    LZ4(true, false),
+
+    // For MV dictionary encoded forward index, add a second level dictionary encoding for the multi-value entries
+    MV_ENTRY_DICT(false, true);
+
+    private final boolean _applicableToRawIndex;
+    private final boolean _applicableToDictEncodedIndex;
+
+    CompressionCodec(boolean applicableToRawIndex, boolean applicableToDictEncodedIndex) {
+      _applicableToRawIndex = applicableToRawIndex;
+      _applicableToDictEncodedIndex = applicableToDictEncodedIndex;
+    }
+
+    public boolean isApplicableToRawIndex() {
+      return _applicableToRawIndex;
+    }
+
+    public boolean isApplicableToDictEncodedIndex() {
+      return _applicableToDictEncodedIndex;
+    }
   }
 
   public String getName() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org