You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/02/23 19:37:06 UTC

[pinot] branch master updated: Relax the constraint to have a dictionary and inverted index when forward index is disabled (#10260)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 02baaac1c1 Relax the constraint to have a dictionary and inverted index when forward index is disabled (#10260)
02baaac1c1 is described below

commit 02baaac1c157cd4a260ce6ad01a7063b07755adf
Author: Sonam Mandal <so...@linkedin.com>
AuthorDate: Thu Feb 23 11:37:00 2023 -0800

    Relax the constraint to have a dictionary and inverted index when forward index is disabled (#10260)
---
 .../creator/impl/DefaultIndexCreatorProvider.java  |  26 +-
 .../creator/impl/SegmentColumnarIndexCreator.java  | 242 ++++----
 .../creator/impl/fwd/NoOpForwardIndexCreator.java  |  64 ---
 .../segment/index/loader/BaseIndexHandler.java     |  12 +-
 .../segment/index/loader/ForwardIndexHandler.java  | 219 ++++---
 .../defaultcolumn/BaseDefaultColumnHandler.java    |  35 +-
 .../segment/local/utils/TableConfigUtils.java      |  32 +-
 .../index/loader/ForwardIndexHandlerTest.java      | 640 ++++++++++++++++++---
 .../local/segment/index/loader/LoaderTest.java     |  11 +-
 .../index/loader/SegmentPreProcessorTest.java      | 227 ++++----
 .../segment/local/utils/TableConfigUtilsTest.java  |  58 +-
 11 files changed, 1033 insertions(+), 533 deletions(-)

diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java
index 6c8859d1ca..2debb77911 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java
@@ -28,7 +28,6 @@ import org.apache.pinot.segment.local.segment.creator.impl.bloom.OnHeapGuavaBloo
 import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.fwd.NoOpForwardIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueFixedByteRawIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSortedForwardIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator;
@@ -91,24 +90,17 @@ public final class DefaultIndexCreatorProvider implements IndexCreatorProvider {
       }
     } else {
       // Dictionary enabled columns
-      if (context.forwardIndexDisabled() && !context.isSorted()) {
-        // Forward index disabled columns which aren't sorted
-        // Sorted columns treat this option as a no-op
-        return new NoOpForwardIndexCreator(context.getFieldSpec().isSingleValueField());
-      } else {
-        // Forward index enabled columns
-        if (context.getFieldSpec().isSingleValueField()) {
-          if (context.isSorted()) {
-            return new SingleValueSortedForwardIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
-                context.getCardinality());
-          } else {
-            return new SingleValueUnsortedForwardIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
-                context.getCardinality(), context.getTotalDocs());
-          }
+      if (context.getFieldSpec().isSingleValueField()) {
+        if (context.isSorted()) {
+          return new SingleValueSortedForwardIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
+              context.getCardinality());
         } else {
-          return new MultiValueUnsortedForwardIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
-              context.getCardinality(), context.getTotalDocs(), context.getTotalNumberOfEntries());
+          return new SingleValueUnsortedForwardIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
+              context.getCardinality(), context.getTotalDocs());
         }
+      } else {
+        return new MultiValueUnsortedForwardIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
+            context.getCardinality(), context.getTotalDocs(), context.getTotalNumberOfEntries());
       }
     }
   }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
index cbee9ad746..813db84e44 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -35,7 +35,6 @@ import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.pinot.common.utils.FileUtils;
 import org.apache.pinot.segment.local.io.util.PinotDataBitSet;
-import org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.nullvalue.NullValueVectorCreator;
 import org.apache.pinot.segment.local.segment.store.TextIndexUtils;
 import org.apache.pinot.segment.local.utils.GeometrySerializer;
@@ -206,8 +205,6 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
           "Cannot create inverted index for raw index column: %s", columnName);
 
       boolean forwardIndexDisabled = forwardIndexDisabledColumns.contains(columnName);
-      validateForwardIndexDisabledIndexCompatibility(columnName, forwardIndexDisabled, dictEnabledColumn,
-          columnIndexCreationInfo, invertedIndexColumns, rangeIndexColumns, rangeIndexVersion, fieldSpec);
 
       IndexCreationContext.Common context = IndexCreationContext.builder()
           .withIndexDir(_indexDir)
@@ -226,8 +223,10 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
       // Initialize forward index creator
       ChunkCompressionType chunkCompressionType =
           dictEnabledColumn ? null : getColumnCompressionType(segmentCreationSpec, fieldSpec);
-      _forwardIndexCreatorMap.put(columnName, _indexCreatorProvider.newForwardIndexCreator(
-          context.forForwardIndex(chunkCompressionType, segmentCreationSpec.getColumnProperties())));
+      // Sorted columns treat the 'forwardIndexDisabled' flag as a no-op
+      _forwardIndexCreatorMap.put(columnName, (forwardIndexDisabled && !columnIndexCreationInfo.isSorted())
+          ? null : _indexCreatorProvider.newForwardIndexCreator(
+              context.forForwardIndex(chunkCompressionType, segmentCreationSpec.getColumnProperties())));
 
       // Initialize inverted index creator; skip creating inverted index if sorted
       if (invertedIndexColumns.contains(columnName) && !columnIndexCreationInfo.isSorted()) {
@@ -314,45 +313,6 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
     }
   }
 
-  /**
-   * Validates the compatibility of the indexes if the column has the forward index disabled. Throws exceptions due to
-   * compatibility mismatch. The checks performed are:
-   *     - Validate dictionary is enabled.
-   *     - Validate inverted index is enabled.
-   *     - Validate that either no range index exists for column or the range index version is at least 2 and isn't a
-   *       multi-value column (since multi-value defaults to index v1).
-   *
-   * @param columnName Name of the column
-   * @param forwardIndexDisabled Whether the forward index is disabled for column or not
-   * @param dictEnabledColumn Whether the column is dictionary enabled or not
-   * @param columnIndexCreationInfo Column index creation info
-   * @param invertedIndexColumns Set of columns with inverted index enabled
-   * @param rangeIndexColumns Set of columns with range index enabled
-   * @param rangeIndexVersion Range index version
-   * @param fieldSpec FieldSpec of column
-   */
-  private void validateForwardIndexDisabledIndexCompatibility(String columnName, boolean forwardIndexDisabled,
-      boolean dictEnabledColumn, ColumnIndexCreationInfo columnIndexCreationInfo, Set<String> invertedIndexColumns,
-      Set<String> rangeIndexColumns, int rangeIndexVersion, FieldSpec fieldSpec) {
-    if (!forwardIndexDisabled) {
-      return;
-    }
-
-    Preconditions.checkState(dictEnabledColumn,
-        String.format("Cannot disable forward index for column %s without dictionary", columnName));
-    Preconditions.checkState(invertedIndexColumns.contains(columnName),
-        String.format("Cannot disable forward index for column %s without inverted index enabled", columnName));
-    if (rangeIndexColumns.contains(columnName)) {
-      Preconditions.checkState(fieldSpec.isSingleValueField(),
-          String.format("Feature not supported for multi-value columns with range index. Cannot disable forward index "
-              + "for column %s. Disable range index on this column to use this feature", columnName));
-      Preconditions.checkState(rangeIndexVersion == BitSlicedRangeIndexCreator.VERSION,
-          String.format("Feature not supported for single-value columns with range index version < 2. Cannot disable "
-              + "forward index for column %s. Either disable range index or create range index with version >= 2 to "
-              + "use this feature", columnName));
-    }
-  }
-
   /**
    * Returns true if dictionary should be created for a column, false otherwise.
    * Currently there are two sources for this config:
@@ -590,7 +550,9 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
           // get dictID from dictionary
           int dictId = dictionaryCreator.indexOfSV(columnValueToIndex);
           // store the docID -> dictID mapping in forward index
-          forwardIndexCreator.putDictId(dictId);
+          if (forwardIndexCreator != null) {
+            forwardIndexCreator.putDictId(dictId);
+          }
           DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap.get(columnName);
           if (invertedIndexCreator != null) {
             // if inverted index enabled during segment creation,
@@ -608,44 +570,48 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
               columnValueToIndex = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE;
             }
           }
-          switch (forwardIndexCreator.getValueType()) {
-            case INT:
-              forwardIndexCreator.putInt((int) columnValueToIndex);
-              break;
-            case LONG:
-              forwardIndexCreator.putLong((long) columnValueToIndex);
-              break;
-            case FLOAT:
-              forwardIndexCreator.putFloat((float) columnValueToIndex);
-              break;
-            case DOUBLE:
-              forwardIndexCreator.putDouble((double) columnValueToIndex);
-              break;
-            case BIG_DECIMAL:
-              forwardIndexCreator.putBigDecimal((BigDecimal) columnValueToIndex);
-              break;
-            case STRING:
-              forwardIndexCreator.putString((String) columnValueToIndex);
-              break;
-            case BYTES:
-              forwardIndexCreator.putBytes((byte[]) columnValueToIndex);
-              break;
-            case JSON:
-              if (columnValueToIndex instanceof String) {
+          if (forwardIndexCreator != null) {
+            switch (forwardIndexCreator.getValueType()) {
+              case INT:
+                forwardIndexCreator.putInt((int) columnValueToIndex);
+                break;
+              case LONG:
+                forwardIndexCreator.putLong((long) columnValueToIndex);
+                break;
+              case FLOAT:
+                forwardIndexCreator.putFloat((float) columnValueToIndex);
+                break;
+              case DOUBLE:
+                forwardIndexCreator.putDouble((double) columnValueToIndex);
+                break;
+              case BIG_DECIMAL:
+                forwardIndexCreator.putBigDecimal((BigDecimal) columnValueToIndex);
+                break;
+              case STRING:
                 forwardIndexCreator.putString((String) columnValueToIndex);
-              } else if (columnValueToIndex instanceof byte[]) {
+                break;
+              case BYTES:
                 forwardIndexCreator.putBytes((byte[]) columnValueToIndex);
-              }
-              break;
-            default:
-              throw new IllegalStateException();
+                break;
+              case JSON:
+                if (columnValueToIndex instanceof String) {
+                  forwardIndexCreator.putString((String) columnValueToIndex);
+                } else if (columnValueToIndex instanceof byte[]) {
+                  forwardIndexCreator.putBytes((byte[]) columnValueToIndex);
+                }
+                break;
+              default:
+                throw new IllegalStateException();
+            }
           }
         }
       } else {
         if (dictionaryCreator != null) {
           //dictionary encoded
           int[] dictIds = dictionaryCreator.indexOfMV(columnValueToIndex);
-          forwardIndexCreator.putDictIdMV(dictIds);
+          if (forwardIndexCreator != null) {
+            forwardIndexCreator.putDictIdMV(dictIds);
+          }
           DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap.get(columnName);
           if (invertedIndexCreator != null) {
             invertedIndexCreator.add(dictIds, dictIds.length);
@@ -653,74 +619,76 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
         } else {
           // for text index on raw columns, check the config to determine if actual raw value should
           // be stored or not
-          if (textIndexCreator != null && !shouldStoreRawValueForTextIndex(columnName)) {
-            Object value = _columnProperties.get(columnName).get(FieldConfig.TEXT_INDEX_RAW_VALUE);
-            if (value == null) {
-              value = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE;
-            }
-            if (forwardIndexCreator.getValueType().getStoredType() == DataType.STRING) {
-              columnValueToIndex = new String[]{String.valueOf(value)};
-            } else if (forwardIndexCreator.getValueType().getStoredType() == DataType.BYTES) {
-              columnValueToIndex = new byte[][]{String.valueOf(value).getBytes(UTF_8)};
-            } else {
-              throw new RuntimeException("Text Index is only supported for STRING and BYTES stored type");
-            }
-          }
-          Object[] values = (Object[]) columnValueToIndex;
-          int length = values.length;
-          switch (forwardIndexCreator.getValueType()) {
-            case INT:
-              int[] ints = new int[length];
-              for (int i = 0; i < length; i++) {
-                ints[i] = (Integer) values[i];
-              }
-              forwardIndexCreator.putIntMV(ints);
-              break;
-            case LONG:
-              long[] longs = new long[length];
-              for (int i = 0; i < length; i++) {
-                longs[i] = (Long) values[i];
+          if (forwardIndexCreator != null) {
+            if (textIndexCreator != null && !shouldStoreRawValueForTextIndex(columnName)) {
+              Object value = _columnProperties.get(columnName).get(FieldConfig.TEXT_INDEX_RAW_VALUE);
+              if (value == null) {
+                value = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE;
               }
-              forwardIndexCreator.putLongMV(longs);
-              break;
-            case FLOAT:
-              float[] floats = new float[length];
-              for (int i = 0; i < length; i++) {
-                floats[i] = (Float) values[i];
-              }
-              forwardIndexCreator.putFloatMV(floats);
-              break;
-            case DOUBLE:
-              double[] doubles = new double[length];
-              for (int i = 0; i < length; i++) {
-                doubles[i] = (Double) values[i];
-              }
-              forwardIndexCreator.putDoubleMV(doubles);
-              break;
-            case STRING:
-              if (values instanceof String[]) {
-                forwardIndexCreator.putStringMV((String[]) values);
+              if (forwardIndexCreator.getValueType().getStoredType() == DataType.STRING) {
+                columnValueToIndex = new String[]{String.valueOf(value)};
+              } else if (forwardIndexCreator.getValueType().getStoredType() == DataType.BYTES) {
+                columnValueToIndex = new byte[][]{String.valueOf(value).getBytes(UTF_8)};
               } else {
-                String[] strings = new String[length];
+                throw new RuntimeException("Text Index is only supported for STRING and BYTES stored type");
+              }
+            }
+            Object[] values = (Object[]) columnValueToIndex;
+            int length = values.length;
+            switch (forwardIndexCreator.getValueType()) {
+              case INT:
+                int[] ints = new int[length];
                 for (int i = 0; i < length; i++) {
-                  strings[i] = (String) values[i];
+                  ints[i] = (Integer) values[i];
                 }
-                forwardIndexCreator.putStringMV(strings);
-              }
-              break;
-            case BYTES:
-              if (values instanceof byte[][]) {
-                forwardIndexCreator.putBytesMV((byte[][]) values);
-              } else {
-                byte[][] bytesArray = new byte[length][];
+                forwardIndexCreator.putIntMV(ints);
+                break;
+              case LONG:
+                long[] longs = new long[length];
                 for (int i = 0; i < length; i++) {
-                  bytesArray[i] = (byte[]) values[i];
+                  longs[i] = (Long) values[i];
                 }
-                forwardIndexCreator.putBytesMV(bytesArray);
-              }
-              break;
-            default:
-              throw new IllegalStateException();
+                forwardIndexCreator.putLongMV(longs);
+                break;
+              case FLOAT:
+                float[] floats = new float[length];
+                for (int i = 0; i < length; i++) {
+                  floats[i] = (Float) values[i];
+                }
+                forwardIndexCreator.putFloatMV(floats);
+                break;
+              case DOUBLE:
+                double[] doubles = new double[length];
+                for (int i = 0; i < length; i++) {
+                  doubles[i] = (Double) values[i];
+                }
+                forwardIndexCreator.putDoubleMV(doubles);
+                break;
+              case STRING:
+                if (values instanceof String[]) {
+                  forwardIndexCreator.putStringMV((String[]) values);
+                } else {
+                  String[] strings = new String[length];
+                  for (int i = 0; i < length; i++) {
+                    strings[i] = (String) values[i];
+                  }
+                  forwardIndexCreator.putStringMV(strings);
+                }
+                break;
+              case BYTES:
+                if (values instanceof byte[][]) {
+                  forwardIndexCreator.putBytesMV((byte[][]) values);
+                } else {
+                  byte[][] bytesArray = new byte[length][];
+                  for (int i = 0; i < length; i++) {
+                    bytesArray[i] = (byte[]) values[i];
+                  }
+                  forwardIndexCreator.putBytesMV(bytesArray);
+                }
+                break;
+              default:
+                throw new IllegalStateException();
+            }
           }
         }
       }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/NoOpForwardIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/NoOpForwardIndexCreator.java
deleted file mode 100644
index bc990ac616..0000000000
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/NoOpForwardIndexCreator.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.segment.local.segment.creator.impl.fwd;
-
-import java.io.IOException;
-import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
-import org.apache.pinot.spi.data.FieldSpec;
-
-
-/**
- * Forward index creator for dictionary-encoded single and multi-value columns with forward index disabled.
- * This is a no-op.
- */
-public class NoOpForwardIndexCreator implements ForwardIndexCreator {
-  private final boolean _isSingleValue;
-
-  public NoOpForwardIndexCreator(boolean isSingleValue) {
-    _isSingleValue = isSingleValue;
-  }
-
-  @Override
-  public boolean isDictionaryEncoded() {
-    return true;
-  }
-
-  @Override
-  public boolean isSingleValue() {
-    return _isSingleValue;
-  }
-
-  @Override
-  public FieldSpec.DataType getValueType() {
-    return FieldSpec.DataType.INT;
-  }
-
-  @Override
-  public void putDictId(int dictId) {
-  }
-
-  @Override
-  public void putDictIdMV(int[] dictIds) {
-  }
-
-  @Override
-  public void close()
-      throws IOException {
-  }
-}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/BaseIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/BaseIndexHandler.java
index f9f0f0d3ea..b4cbc7bcb8 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/BaseIndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/BaseIndexHandler.java
@@ -69,11 +69,17 @@ public abstract class BaseIndexHandler implements IndexHandler {
       return _segmentDirectory.getSegmentMetadata().getColumnMetadataFor(columnName);
     }
 
-    // If forward index is disabled it means that it has to be dictionary based and the inverted index must exist.
+    // If forward index is disabled it means that it has to be dictionary based and the inverted index must exist to
+    // regenerate it. If either are missing the only way to get the forward index back and potentially use it to
+    // generate newly added indexes is to refresh or back-fill the forward index.
     Preconditions.checkState(segmentWriter.hasIndexFor(columnName, ColumnIndexType.DICTIONARY),
-        String.format("Forward index disabled column %s must have a dictionary", columnName));
+        String.format("Forward index disabled column %s must have a dictionary to regenerate the forward index. "
+            + "Regeneration of the forward index is required to create new indexes as well. Please refresh or "
+            + "back-fill the forward index", columnName));
     Preconditions.checkState(segmentWriter.hasIndexFor(columnName, ColumnIndexType.INVERTED_INDEX),
-        String.format("Forward index disabled column %s must have an inverted index", columnName));
+        String.format("Forward index disabled column %s must have an inverted index to regenerate the forward index. "
+            + "Regeneration of the forward index is required to create new indexes as well. Please refresh or "
+            + "back-fill the forward index", columnName));
 
     LOGGER.info("Rebuilding the forward index for column: {}, is temporary: {}", columnName, isTemporaryForwardIndex);
     InvertedIndexAndDictionaryBasedForwardIndexCreator invertedIndexAndDictionaryBasedForwardIndexCreator =
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
index 83d4d339b7..69c431fcdb 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java
@@ -21,9 +21,9 @@ package org.apache.pinot.segment.local.segment.index.loader;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.io.File;
-import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -93,12 +93,10 @@ public class ForwardIndexHandler extends BaseIndexHandler {
   private final Schema _schema;
 
   protected enum Operation {
-    DISABLE_FORWARD_INDEX_FOR_DICT_COLUMN,
-    DISABLE_FORWARD_INDEX_FOR_RAW_COLUMN,
-    ENABLE_FORWARD_INDEX_FOR_DICT_COLUMN,
-    ENABLE_FORWARD_INDEX_FOR_RAW_COLUMN,
-    ENABLE_DICTIONARY,
+    DISABLE_FORWARD_INDEX,
+    ENABLE_FORWARD_INDEX,
     DISABLE_DICTIONARY,
+    ENABLE_DICTIONARY,
     CHANGE_RAW_INDEX_COMPRESSION_TYPE,
   }
 
@@ -110,86 +108,81 @@ public class ForwardIndexHandler extends BaseIndexHandler {
   @Override
   public boolean needUpdateIndices(SegmentDirectory.Reader segmentReader)
       throws Exception {
-    Map<String, Operation> columnOperationMap = computeOperation(segmentReader);
-    return !columnOperationMap.isEmpty();
+    Map<String, List<Operation>> columnOperationsMap = computeOperations(segmentReader);
+    return !columnOperationsMap.isEmpty();
   }
 
   @Override
   public void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider)
       throws Exception {
-    Map<String, Operation> columnOperationMap = computeOperation(segmentWriter);
-    if (columnOperationMap.isEmpty()) {
+    Map<String, List<Operation>> columnOperationsMap = computeOperations(segmentWriter);
+    if (columnOperationsMap.isEmpty()) {
       return;
     }
 
-    for (Map.Entry<String, Operation> entry : columnOperationMap.entrySet()) {
+    for (Map.Entry<String, List<Operation>> entry : columnOperationsMap.entrySet()) {
       String column = entry.getKey();
-      Operation operation = entry.getValue();
-
-      switch (operation) {
-        case DISABLE_FORWARD_INDEX_FOR_DICT_COLUMN: {
-          // Deletion of the forward index will be handled outside the index handler to ensure that other index
-          // handlers that need the forward index to construct their own indexes will have it available.
-          // The existing forward index must be in dictionary format for this to be a no-op.
-          _tmpForwardIndexColumns.add(column);
-          break;
-        }
-        case DISABLE_FORWARD_INDEX_FOR_RAW_COLUMN: {
-          // The forward index has been disabled for a column which has a noDictionary based forward index. A dictionary
-          // and inverted index need to be created before we can delete the forward index. We create a dictionary here,
-          // but let the InvertedIndexHandler handle the creation of the inverted index. We create a temporary
-          // forward index here which is dictionary based and allow the post deletion step handle the actual deletion
-          // of the forward index.
-          createDictBasedForwardIndex(column, segmentWriter, indexCreatorProvider);
-          if (!segmentWriter.hasIndexFor(column, ColumnIndexType.FORWARD_INDEX)) {
-            throw new IOException(String.format("Temporary forward index was not created for column: %s", column));
-          }
-          _tmpForwardIndexColumns.add(column);
-          break;
-        }
-        case ENABLE_FORWARD_INDEX_FOR_DICT_COLUMN: {
-          createForwardIndexIfNeeded(segmentWriter, column, indexCreatorProvider, false);
-          if (!segmentWriter.hasIndexFor(column, ColumnIndexType.DICTIONARY)) {
-            throw new IOException(
-                String.format("Dictionary should still exist after rebuilding forward index for dictionary column: %s",
-                    column));
-          }
-          break;
-        }
-        case ENABLE_FORWARD_INDEX_FOR_RAW_COLUMN: {
-          createForwardIndexIfNeeded(segmentWriter, column, indexCreatorProvider, false);
-          if (segmentWriter.hasIndexFor(column, ColumnIndexType.DICTIONARY)) {
-            throw new IOException(
-                String.format("Dictionary should not exist after rebuilding forward index for raw column: %s", column));
-          }
-          break;
+      List<Operation> operations = entry.getValue();
+
+      for (Operation operation : operations) {
+        switch (operation) {
+          case DISABLE_FORWARD_INDEX:
+            // Deletion of the forward index will be handled outside the index handler to ensure that other index
+            // handlers that need the forward index to construct their own indexes will have it available.
+            _tmpForwardIndexColumns.add(column);
+            break;
+          case ENABLE_FORWARD_INDEX:
+            ColumnMetadata columnMetadata = createForwardIndexIfNeeded(segmentWriter, column, indexCreatorProvider,
+                false);
+            if (columnMetadata.hasDictionary()) {
+              if (!segmentWriter.hasIndexFor(column, ColumnIndexType.DICTIONARY)) {
+                throw new IllegalStateException(String.format("Dictionary should still exist after rebuilding "
+                    + "forward index for dictionary column: %s", column));
+              }
+            } else {
+              if (segmentWriter.hasIndexFor(column, ColumnIndexType.DICTIONARY)) {
+                throw new IllegalStateException(
+                    String.format("Dictionary should not exist after rebuilding forward index for raw column: %s",
+                        column));
+              }
+            }
+            break;
+          case DISABLE_DICTIONARY:
+            Set<String> newForwardIndexDisabledColumns = _indexLoadingConfig.getForwardIndexDisabledColumns();
+            if (newForwardIndexDisabledColumns.contains(column)) {
+              removeDictionaryFromForwardIndexDisabledColumn(column, segmentWriter);
+              if (segmentWriter.hasIndexFor(column, ColumnIndexType.DICTIONARY)) {
+                throw new IllegalStateException(
+                    String.format("Dictionary should not exist after disabling dictionary for column: %s", column));
+              }
+            } else {
+              disableDictionaryAndCreateRawForwardIndex(column, segmentWriter, indexCreatorProvider);
+            }
+            break;
+          case ENABLE_DICTIONARY:
+            createDictBasedForwardIndex(column, segmentWriter, indexCreatorProvider);
+            if (!segmentWriter.hasIndexFor(column, ColumnIndexType.FORWARD_INDEX)) {
+              throw new IllegalStateException(String.format("Forward index was not created for column: %s", column));
+            }
+            break;
+          case CHANGE_RAW_INDEX_COMPRESSION_TYPE:
+            rewriteRawForwardIndexForCompressionChange(column, segmentWriter, indexCreatorProvider);
+            break;
+          default:
+            throw new IllegalStateException("Unsupported operation for column " + column);
         }
-        case ENABLE_DICTIONARY: {
-          createDictBasedForwardIndex(column, segmentWriter, indexCreatorProvider);
-          break;
-        }
-        case DISABLE_DICTIONARY: {
-          disableDictionaryAndCreateRawForwardIndex(column, segmentWriter, indexCreatorProvider);
-          break;
-        }
-        case CHANGE_RAW_INDEX_COMPRESSION_TYPE: {
-          rewriteRawForwardIndexForCompressionChange(column, segmentWriter, indexCreatorProvider);
-          break;
-        }
-        default:
-          throw new IllegalStateException("Unsupported operation for column " + column);
       }
     }
   }
 
   @VisibleForTesting
-  Map<String, Operation> computeOperation(SegmentDirectory.Reader segmentReader)
+  Map<String, List<Operation>> computeOperations(SegmentDirectory.Reader segmentReader)
       throws Exception {
-    Map<String, Operation> columnOperationMap = new HashMap<>();
+    Map<String, List<Operation>> columnOperationsMap = new HashMap<>();
 
     // Does not work for segment versions < V3.
     if (_segmentDirectory.getSegmentMetadata().getVersion().compareTo(SegmentVersion.v3) < 0) {
-      return columnOperationMap;
+      return columnOperationsMap;
     }
 
     // From existing column config.
@@ -220,10 +213,6 @@ public class ForwardIndexHandler extends BaseIndexHandler {
     for (String column : existingAllColumns) {
       if (existingForwardIndexColumns.contains(column) && newForwardIndexDisabledColumns.contains(column)) {
         // Existing column has a forward index. New column config disables the forward index
-        Preconditions.checkState(!newNoDictColumns.contains(column),
-            String.format("Must enable dictionary to disable the forward index for column: %s", column));
-        Preconditions.checkState(_indexLoadingConfig.getInvertedIndexColumns().contains(column),
-            String.format("Must enable inverted index to disable the forward index for column: %s", column));
 
         ColumnMetadata columnMetadata = _segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
         if (columnMetadata.isSorted()) {
@@ -234,13 +223,33 @@ public class ForwardIndexHandler extends BaseIndexHandler {
         }
 
         if (existingDictColumns.contains(column)) {
-          columnOperationMap.put(column, Operation.DISABLE_FORWARD_INDEX_FOR_DICT_COLUMN);
+          if (newNoDictColumns.contains(column)) {
+            // Dictionary was also disabled. Just disable the dictionary and remove it along with the forward index
+            // If range index exists, don't try to regenerate it on toggling the dictionary, throw an error instead
+            Preconditions.checkState(!_indexLoadingConfig.getRangeIndexColumns().contains(column),
+                String.format("Must disable range (%s) index to disable the dictionary and forward index for "
+                        + "column: %s or refresh / back-fill the forward index",
+                    _indexLoadingConfig.getRangeIndexColumns().contains(column) ? "enabled" : "disabled", column));
+            columnOperationsMap.put(column,
+                Arrays.asList(Operation.DISABLE_FORWARD_INDEX, Operation.DISABLE_DICTIONARY));
+          } else {
+            // Dictionary is still enabled, keep it but remove the forward index
+            columnOperationsMap.put(column, Collections.singletonList(Operation.DISABLE_FORWARD_INDEX));
+          }
         } else {
-          columnOperationMap.put(column, Operation.DISABLE_FORWARD_INDEX_FOR_RAW_COLUMN);
+          if (newNoDictColumns.contains(column)) {
+            // Dictionary remains disabled and we should not reconstruct temporary forward index as dictionary based
+            columnOperationsMap.put(column, Collections.singletonList(Operation.DISABLE_FORWARD_INDEX));
+          } else {
+            // Dictionary is enabled, creation of dictionary and conversion to dictionary based forward index is needed
+            columnOperationsMap.put(column,
+                Arrays.asList(Operation.DISABLE_FORWARD_INDEX, Operation.ENABLE_DICTIONARY));
+          }
         }
       } else if (existingForwardIndexDisabledColumns.contains(column)
           && !newForwardIndexDisabledColumns.contains(column)) {
         // Existing column does not have a forward index. New column config enables the forward index
+
         ColumnMetadata columnMetadata = _segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
         if (columnMetadata != null && columnMetadata.isSorted()) {
           // Check if the column is sorted. If sorted, disabling forward index should be a no-op and forward index
@@ -249,19 +258,45 @@ public class ForwardIndexHandler extends BaseIndexHandler {
           continue;
         }
 
+        // Get list of columns with inverted index
+        Set<String> existingInvertedIndexColumns =
+            segmentReader.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.INVERTED_INDEX);
+        if (!existingDictColumns.contains(column) || !existingInvertedIndexColumns.contains(column)) {
+          // If either dictionary or inverted index is missing on the column there is no way to re-generate the forward
+          // index. Treat this as a no-op and log a warning.
+          LOGGER.warn("Trying to enable the forward index for a column {} missing either the dictionary ({}) and / or "
+                  + "the inverted index ({}) is not possible. Either a refresh or back-fill is required to get the "
+                  + "forward index, ignoring", column, existingDictColumns.contains(column) ? "enabled" : "disabled",
+              existingInvertedIndexColumns.contains(column) ? "enabled" : "disabled");
+          continue;
+        }
+
         if (newNoDictColumns.contains(column)) {
-          Preconditions.checkState(!_indexLoadingConfig.getInvertedIndexColumns().contains(column),
-              String.format("Must disable inverted index to enable the forward index as noDictionary for column: %s",
-                  column));
-          columnOperationMap.put(column, Operation.ENABLE_FORWARD_INDEX_FOR_RAW_COLUMN);
+          columnOperationsMap.put(column, Collections.singletonList(Operation.ENABLE_FORWARD_INDEX));
         } else {
-          columnOperationMap.put(column, Operation.ENABLE_FORWARD_INDEX_FOR_DICT_COLUMN);
+          columnOperationsMap.put(column, Collections.singletonList(Operation.ENABLE_FORWARD_INDEX));
         }
       } else if (existingForwardIndexDisabledColumns.contains(column)
           && newForwardIndexDisabledColumns.contains(column)) {
         // Forward index is disabled for the existing column and should remain disabled based on the latest config
-        Preconditions.checkState(existingDictColumns.contains(column) && !newNoDictColumns.contains(column),
-            String.format("Not allowed to disable the dictionary for a column: %s without forward index", column));
+        // Need some checks to see whether the dictionary is being enabled or disabled here and take appropriate actions
+
+        // If the dictionary is not enabled on the existing column it must be on the new noDictionary column list.
+        // Cannot enable the dictionary for a column with forward index disabled.
+        Preconditions.checkState(existingDictColumns.contains(column) || newNoDictColumns.contains(column),
+            String.format("Cannot regenerate the dictionary for column %s with forward index disabled. Please "
+                + "refresh or back-fill the data to add back the forward index", column));
+
+        if (existingDictColumns.contains(column) && newNoDictColumns.contains(column)) {
+          // Dictionary is currently enabled on this column but is supposed to be disabled. Remove the dictionary
+          // and update the segment metadata If the range index exists then throw an error since we are not
+          // regenerating the range index on toggling the dictionary
+          Preconditions.checkState(!_indexLoadingConfig.getRangeIndexColumns().contains(column),
+              String.format("Must disable range (%s) index to disable the dictionary for a forwardIndexDisabled "
+                      + "column: %s or refresh / back-fill the forward index",
+                  _indexLoadingConfig.getRangeIndexColumns().contains(column) ? "enabled" : "disabled", column));
+          columnOperationsMap.put(column, Collections.singletonList(Operation.DISABLE_DICTIONARY));
+        }
       } else if (existingNoDictColumns.contains(column) && !newNoDictColumns.contains(column)) {
         // Existing column is RAW. New column is dictionary enabled.
         if (_schema == null || _indexLoadingConfig.getTableConfig() == null) {
@@ -274,21 +309,37 @@ public class ForwardIndexHandler extends BaseIndexHandler {
         ColumnMetadata existingColMetadata = _segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
         Preconditions.checkState(!existingColMetadata.isSorted(), "Raw column=" + column + " cannot be sorted.");
 
-        columnOperationMap.put(column, Operation.ENABLE_DICTIONARY);
+        columnOperationsMap.put(column, Collections.singletonList(Operation.ENABLE_DICTIONARY));
       } else if (existingDictColumns.contains(column) && newNoDictColumns.contains(column)) {
         // Existing column has dictionary. New config for the column is RAW.
         if (shouldDisableDictionary(column, _segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column))) {
-          columnOperationMap.put(column, Operation.DISABLE_DICTIONARY);
+          columnOperationsMap.put(column, Collections.singletonList(Operation.DISABLE_DICTIONARY));
         }
       } else if (existingNoDictColumns.contains(column) && newNoDictColumns.contains(column)) {
         // Both existing and new column is RAW forward index encoded. Check if compression needs to be changed.
         if (shouldChangeCompressionType(column, segmentReader)) {
-          columnOperationMap.put(column, Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE);
+          columnOperationsMap.put(column, Collections.singletonList(Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE));
         }
       }
     }
 
-    return columnOperationMap;
+    return columnOperationsMap;
+  }
+
+  private void removeDictionaryFromForwardIndexDisabledColumn(String column, SegmentDirectory.Writer segmentWriter)
+      throws Exception {
+    // Remove the dictionary and update the metadata to indicate that the dictionary is no longer present
+    segmentWriter.removeIndex(column, ColumnIndexType.DICTIONARY);
+    String segmentName = _segmentDirectory.getSegmentMetadata().getName();
+    LOGGER.info("Removed dictionary for noForwardIndex column. Updating metadata properties for segment={} and "
+            + "column={}", segmentName, column);
+    Map<String, String> metadataProperties = new HashMap<>();
+    metadataProperties.put(getKeyFor(column, HAS_DICTIONARY), String.valueOf(false));
+    metadataProperties.put(getKeyFor(column, DICTIONARY_ELEMENT_SIZE), String.valueOf(0));
+    SegmentMetadataUtils.updateMetadataProperties(_segmentDirectory, metadataProperties);
+
+    // Remove the inverted index, FST index and range index
+    removeDictRelatedIndexes(column, segmentWriter);
   }
 
   private boolean shouldDisableDictionary(String column, ColumnMetadata existingColumnMetadata) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index 1adfbe03d1..bae3c5cbfe 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -41,7 +41,6 @@ import org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCrea
 import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSortedForwardIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator;
-import org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.inv.OffHeapBitmapInvertedIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.nullvalue.NullValueVectorCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.stats.BytesColumnPredIndexStatsCollector;
@@ -408,33 +407,6 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
     return true;
   }
 
-  /**
-   * Validates the compatibility of the indexes if the column has the forward index disabled. Throws exceptions due to
-   * compatibility mismatch. The checks performed are:
-   *     - Validate dictionary is enabled.
-   *     - Validate inverted index is enabled.
-   *     - Validate that either no range index exists for column or the range index version is at least 2 and isn't a
-   *       multi-value column (since multi-value defaults to index v1).
-   */
-  protected void validateForwardIndexDisabledConfigsIfPresent(String column, boolean forwardIndexDisabled) {
-    if (!forwardIndexDisabled) {
-      return;
-    }
-    FieldSpec fieldSpec = _schema.getFieldSpecFor(column);
-    Preconditions.checkState(_indexLoadingConfig.getInvertedIndexColumns().contains(column),
-          String.format("Inverted index must be enabled for forward index disabled column: %s", column));
-      Preconditions.checkState(!_indexLoadingConfig.getNoDictionaryColumns().contains(column),
-          String.format("Dictionary disabled column: %s cannot disable the forward index", column));
-      if (_indexLoadingConfig.getRangeIndexColumns() != null
-          && _indexLoadingConfig.getRangeIndexColumns().contains(column)) {
-        Preconditions.checkState(fieldSpec.isSingleValueField(),
-            String.format("Multi-value column with range index: %s cannot disable the forward index", column));
-        Preconditions.checkState(_indexLoadingConfig.getRangeIndexVersion() == BitSlicedRangeIndexCreator.VERSION,
-            String.format("Single-value column with range index version < 2: %s cannot disable the forward index",
-                column));
-      }
-  }
-
   /**
    * Check and return whether the forward index is disabled for a given column
    */
@@ -456,9 +428,6 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
     Object defaultValue = fieldSpec.getDefaultNullValue();
     boolean isSingleValue = fieldSpec.isSingleValueField();
     int maxNumberOfMultiValueElements = isSingleValue ? 0 : 1;
-    boolean forwardIndexDisabled = isForwardIndexDisabled(column);
-
-    validateForwardIndexDisabledConfigsIfPresent(column, forwardIndexDisabled);
 
     Object sortedArray;
     switch (dataType.getStoredType()) {
@@ -525,12 +494,14 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
     } else {
       // Multi-value column.
 
+      boolean forwardIndexDisabled = isForwardIndexDisabled(column);
       if (forwardIndexDisabled) {
         // Generate an inverted index instead of forward index for multi-value columns when forward index is disabled
         try (DictionaryBasedInvertedIndexCreator creator = new OffHeapBitmapInvertedIndexCreator(_indexDir, fieldSpec,
             1, totalDocs, totalDocs)) {
+          int[] dictIds = new int[]{0};
           for (int docId = 0; docId < totalDocs; docId++) {
-            creator.add(0);
+            creator.add(dictIds, 1);
           }
           creator.seal();
         }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index e2cb821574..987d10de6c 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -930,10 +930,13 @@ public final class TableConfigUtils {
   /**
    * Validates the compatibility of the indexes if the column has the forward index disabled. Throws exceptions due to
    * compatibility mismatch. The checks performed are:
-   *     - Validate dictionary is enabled.
-   *     - Validate inverted index is enabled.
+   *
    *     - Validate that either no range index exists for column or the range index version is at least 2 and isn't a
-   *       multi-value column (since mulit-value defaults to index v1).
+   *       multi-value column (since multi-value defaults to index v1).
+   *
+   * To rebuild the forward index when it has been disabled the dictionary and inverted index must be enabled for the
+   * given column. If either the inverted index or dictionary are disabled then the only way to get the forward index
+   * back or generate a new index for existing segments is to either refresh or back-fill the segments.
    */
   private static void validateForwardIndexDisabledIndexCompatibility(String columnName, FieldConfig fieldConfig,
       IndexingConfig indexingConfigs, List<String> noDictionaryColumns, Schema schema) {
@@ -942,18 +945,15 @@ public final class TableConfigUtils {
       return;
     }
 
-    boolean forwardIndexDisabled = Boolean.parseBoolean(fieldConfigProperties.get(FieldConfig.FORWARD_INDEX_DISABLED));
+    boolean forwardIndexDisabled =
+        Boolean.parseBoolean(fieldConfigProperties.getOrDefault(FieldConfig.FORWARD_INDEX_DISABLED,
+            FieldConfig.DEFAULT_FORWARD_INDEX_DISABLED));
     if (!forwardIndexDisabled) {
       return;
     }
 
     FieldSpec fieldSpec = schema.getFieldSpecFor(columnName);
-    Preconditions.checkState(fieldConfig.getEncodingType() == FieldConfig.EncodingType.DICTIONARY
-            || noDictionaryColumns == null || !noDictionaryColumns.contains(columnName),
-        String.format("Forward index disabled column %s must have dictionary enabled", columnName));
-    Preconditions.checkState(indexingConfigs.getInvertedIndexColumns() != null
-            && indexingConfigs.getInvertedIndexColumns().contains(columnName),
-        String.format("Forward index disabled column %s must have inverted index enabled", columnName));
+    // Check for the range index since the index itself relies on the existence of the forward index to work.
     if (indexingConfigs.getRangeIndexColumns() != null && indexingConfigs.getRangeIndexColumns().contains(columnName)) {
       Preconditions.checkState(fieldSpec.isSingleValueField(), String.format("Feature not supported for multi-value "
           + "columns with range index. Cannot disable forward index for column %s. Disable range index on this "
@@ -963,6 +963,18 @@ public final class TableConfigUtils {
               + "forward index for column %s. Either disable range index or create range index with"
               + " version >= 2 to use this feature", columnName));
     }
+
+    boolean hasDictionary = fieldConfig.getEncodingType() == FieldConfig.EncodingType.DICTIONARY
+        || noDictionaryColumns == null || !noDictionaryColumns.contains(columnName);
+    boolean hasInvertedIndex = indexingConfigs.getInvertedIndexColumns() != null
+        && indexingConfigs.getInvertedIndexColumns().contains(columnName);
+
+    if (!hasDictionary || !hasInvertedIndex) {
+      LOGGER.warn("Forward index has been disabled for column {}. Either dictionary ({}) and / or inverted index ({}) "
+              + "has been disabled. If the forward index needs to be regenerated or another index added please refresh "
+              + "or back-fill the forward index as it cannot be rebuilt without dictionary and inverted index.",
+          columnName, hasDictionary ? "enabled" : "disabled", hasInvertedIndex ? "enabled" : "disabled");
+    }
   }
 
   private static void sanitize(TableConfig tableConfig) {
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
index d43ad5badd..aca34113ee 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java
@@ -26,7 +26,6 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -60,6 +59,7 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -122,7 +122,6 @@ public class ForwardIndexHandlerTest {
   private static final String DIM_DICT_MV_STRING = "DIM_DICT_MV_STRING";
   private static final String DIM_DICT_MV_BYTES = "DIM_DICT_MV_BYTES";
 
-
   // Forward index disabled single-value columns
   private static final String DIM_SV_FORWARD_INDEX_DISABLED_INTEGER = "DIM_SV_FORWARD_INDEX_DISABLED_INTEGER";
   private static final String DIM_SV_FORWARD_INDEX_DISABLED_LONG = "DIM_SV_FORWARD_INDEX_DISABLED_LONG";
@@ -145,6 +144,20 @@ public class ForwardIndexHandlerTest {
   private static final String DIM_MV_FORWARD_INDEX_DISABLED_DUPLICATES_BYTES =
       "DIM_MV_FORWARD_INDEX_DISABLED_DUPLICATES_BYTES";
 
+  // Forward index disabled raw single-value column
+  private static final String DIM_RAW_SV_FORWARD_INDEX_DISABLED_INTEGER = "DIM_RAW_SV_FORWARD_INDEX_DISABLED_INTEGER";
+
+  // Forward index disabled raw multi-value column
+  private static final String DIM_RAW_MV_FORWARD_INDEX_DISABLED_INTEGER = "DIM_RAW_MV_FORWARD_INDEX_DISABLED_INTEGER";
+
+  // Forward index disabled dictionary enabled but inverted index disabled single-value column
+  private static final String DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX =
+      "DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX";
+
+  // Dictionary based forward index disabled column with a range index
+  private static final String DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX =
+      "DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX";
+
   private static final List<String> RAW_SNAPPY_INDEX_COLUMNS =
       Arrays.asList(DIM_SNAPPY_STRING, DIM_SNAPPY_LONG, DIM_SNAPPY_INTEGER, DIM_SNAPPY_BYTES, METRIC_SNAPPY_BIG_DECIMAL,
           METRIC_SNAPPY_INTEGER);
@@ -178,8 +191,12 @@ public class ForwardIndexHandlerTest {
       Arrays.asList(DIM_MV_FORWARD_INDEX_DISABLED_DUPLICATES_INTEGER, DIM_MV_FORWARD_INDEX_DISABLED_DUPLICATES_LONG,
           DIM_MV_FORWARD_INDEX_DISABLED_DUPLICATES_STRING, DIM_MV_FORWARD_INDEX_DISABLED_DUPLICATES_BYTES);
 
+  private static final List<String> FORWARD_INDEX_DISABLED_RAW_COLUMNS =
+      Arrays.asList(DIM_RAW_SV_FORWARD_INDEX_DISABLED_INTEGER, DIM_RAW_MV_FORWARD_INDEX_DISABLED_INTEGER);
+
   private final List<String> _noDictionaryColumns = new ArrayList<>();
   private final List<String> _forwardIndexDisabledColumns = new ArrayList<>();
+  private final List<String> _invertedIndexColumns = new ArrayList<>();
   TableConfig _tableConfig;
   Schema _schema;
   File _segmentDirectory;
@@ -208,7 +225,8 @@ public class ForwardIndexHandlerTest {
     List<FieldConfig> fieldConfigs = new ArrayList<>(
         RAW_SNAPPY_INDEX_COLUMNS.size() + RAW_ZSTANDARD_INDEX_COLUMNS.size() + RAW_PASS_THROUGH_INDEX_COLUMNS.size()
             + RAW_LZ4_INDEX_COLUMNS.size() + SV_FORWARD_INDEX_DISABLED_COLUMNS.size()
-            + MV_FORWARD_INDEX_DISABLED_COLUMNS.size() + MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.size());
+            + MV_FORWARD_INDEX_DISABLED_COLUMNS.size() + MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.size()
+            + FORWARD_INDEX_DISABLED_RAW_COLUMNS.size() + 2);
 
     for (String indexColumn : RAW_SNAPPY_INDEX_COLUMNS) {
       fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW, Collections.emptyList(),
@@ -248,18 +266,42 @@ public class ForwardIndexHandlerTest {
           Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString())));
     }
 
+    for (String indexColumn : FORWARD_INDEX_DISABLED_RAW_COLUMNS) {
+      fieldConfigs.add(new FieldConfig(indexColumn, FieldConfig.EncodingType.RAW,
+          Collections.emptyList(), FieldConfig.CompressionCodec.LZ4,
+          Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString())));
+    }
+
+    fieldConfigs.add(new FieldConfig(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX,
+        FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(), null,
+        Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString())));
+
+    fieldConfigs.add(new FieldConfig(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX,
+        FieldConfig.EncodingType.DICTIONARY, Collections.singletonList(FieldConfig.IndexType.RANGE), null,
+        Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString())));
+
     _noDictionaryColumns.addAll(RAW_SNAPPY_INDEX_COLUMNS);
     _noDictionaryColumns.addAll(RAW_ZSTANDARD_INDEX_COLUMNS);
     _noDictionaryColumns.addAll(RAW_PASS_THROUGH_INDEX_COLUMNS);
     _noDictionaryColumns.addAll(RAW_LZ4_INDEX_COLUMNS);
+    _noDictionaryColumns.addAll(FORWARD_INDEX_DISABLED_RAW_COLUMNS);
 
     _forwardIndexDisabledColumns.addAll(SV_FORWARD_INDEX_DISABLED_COLUMNS);
     _forwardIndexDisabledColumns.addAll(MV_FORWARD_INDEX_DISABLED_COLUMNS);
     _forwardIndexDisabledColumns.addAll(MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS);
+    _forwardIndexDisabledColumns.addAll(FORWARD_INDEX_DISABLED_RAW_COLUMNS);
+    _forwardIndexDisabledColumns.add(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX);
+    _forwardIndexDisabledColumns.add(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX);
+
+    _invertedIndexColumns.addAll(SV_FORWARD_INDEX_DISABLED_COLUMNS);
+    _invertedIndexColumns.addAll(MV_FORWARD_INDEX_DISABLED_COLUMNS);
+    _invertedIndexColumns.addAll(MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS);
 
     _tableConfig =
         new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setNoDictionaryColumns(_noDictionaryColumns)
-            .setInvertedIndexColumns(_forwardIndexDisabledColumns).setFieldConfigList(fieldConfigs).build();
+            .setInvertedIndexColumns(_invertedIndexColumns)
+            .setRangeIndexColumns(Collections.singletonList(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX))
+            .setFieldConfigList(fieldConfigs).build();
     _schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
         .addSingleValueDimension(DIM_SNAPPY_STRING, FieldSpec.DataType.STRING)
         .addSingleValueDimension(DIM_PASS_THROUGH_STRING, FieldSpec.DataType.STRING)
@@ -286,7 +328,8 @@ public class ForwardIndexHandlerTest {
         .addSingleValueDimension(DIM_DICT_STRING, FieldSpec.DataType.STRING)
         .addSingleValueDimension(DIM_DICT_BYES, FieldSpec.DataType.BYTES)
         .addMetric(METRIC_PASS_THROUGH_INTEGER, FieldSpec.DataType.INT)
-        .addMetric(METRIC_SNAPPY_INTEGER, FieldSpec.DataType.INT).addMetric(METRIC_LZ4_INTEGER, FieldSpec.DataType.INT)
+        .addMetric(METRIC_SNAPPY_INTEGER, FieldSpec.DataType.INT)
+        .addMetric(METRIC_LZ4_INTEGER, FieldSpec.DataType.INT)
         .addMetric(METRIC_ZSTANDARD_INTEGER, FieldSpec.DataType.INT)
         .addMultiValueDimension(DIM_MV_PASS_THROUGH_INTEGER, FieldSpec.DataType.INT)
         .addMultiValueDimension(DIM_MV_PASS_THROUGH_LONG, FieldSpec.DataType.LONG)
@@ -307,13 +350,21 @@ public class ForwardIndexHandlerTest {
         .addMultiValueDimension(DIM_MV_FORWARD_INDEX_DISABLED_DUPLICATES_INTEGER, FieldSpec.DataType.INT)
         .addMultiValueDimension(DIM_MV_FORWARD_INDEX_DISABLED_DUPLICATES_LONG, FieldSpec.DataType.LONG)
         .addMultiValueDimension(DIM_MV_FORWARD_INDEX_DISABLED_DUPLICATES_STRING, FieldSpec.DataType.STRING)
-        .addMultiValueDimension(DIM_MV_FORWARD_INDEX_DISABLED_DUPLICATES_BYTES, FieldSpec.DataType.BYTES).build();
+        .addMultiValueDimension(DIM_MV_FORWARD_INDEX_DISABLED_DUPLICATES_BYTES, FieldSpec.DataType.BYTES)
+        .addSingleValueDimension(DIM_RAW_SV_FORWARD_INDEX_DISABLED_INTEGER, FieldSpec.DataType.INT)
+        .addMultiValueDimension(DIM_RAW_MV_FORWARD_INDEX_DISABLED_INTEGER, FieldSpec.DataType.INT)
+        .addSingleValueDimension(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX, FieldSpec.DataType.INT)
+        .addSingleValueDimension(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX, FieldSpec.DataType.INT)
+        .build();
 
     SegmentGeneratorConfig config = new SegmentGeneratorConfig(_tableConfig, _schema);
     config.setOutDir(INDEX_DIR.getPath());
     config.setTableName(TABLE_NAME);
     config.setSegmentName(SEGMENT_NAME);
-    config.setInvertedIndexCreationColumns(_forwardIndexDisabledColumns);
+    config.setInvertedIndexCreationColumns(_invertedIndexColumns);
+    config.setForwardIndexDisabledColumns(_forwardIndexDisabledColumns);
+    config.setRangeIndexCreationColumns(
+        Collections.singletonList(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX));
     SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
     try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
       driver.init(config, recordReader);
@@ -457,12 +508,16 @@ public class ForwardIndexHandlerTest {
       row.putValue(DIM_SV_FORWARD_INDEX_DISABLED_LONG, tempLongRows[i]);
       row.putValue(DIM_SV_FORWARD_INDEX_DISABLED_STRING, tempStringRows[i]);
       row.putValue(DIM_SV_FORWARD_INDEX_DISABLED_BYTES, tempBytesRows[i]);
+      row.putValue(DIM_RAW_SV_FORWARD_INDEX_DISABLED_INTEGER, tempIntRows[i]);
+      row.putValue(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX, tempIntRows[i]);
+      row.putValue(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX, tempIntRows[i]);
 
       // Forward index disabled MV columns
       row.putValue(DIM_MV_FORWARD_INDEX_DISABLED_INTEGER, tempMVIntRowsForwardIndexDisabled[i]);
       row.putValue(DIM_MV_FORWARD_INDEX_DISABLED_LONG, tempMVLongRowsForwardIndexDisabled[i]);
       row.putValue(DIM_MV_FORWARD_INDEX_DISABLED_STRING, tempMVStringRowsForwardIndexDisabled[i]);
       row.putValue(DIM_MV_FORWARD_INDEX_DISABLED_BYTES, tempMVByteRowsForwardIndexDisabled[i]);
+      row.putValue(DIM_RAW_MV_FORWARD_INDEX_DISABLED_INTEGER, tempMVIntRowsForwardIndexDisabled[i]);
 
       // Forward index disabled MV columns with duplicates
       row.putValue(DIM_MV_FORWARD_INDEX_DISABLED_DUPLICATES_INTEGER, tempMVIntRows[i]);
@@ -486,8 +541,7 @@ public class ForwardIndexHandlerTest {
     // TEST1: Validate with zero changes. ForwardIndexHandler should be a No-Op.
     IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
     ForwardIndexHandler fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, null);
-    Map<String, ForwardIndexHandler.Operation> operationMap = new HashMap<>();
-    operationMap = fwdIndexHandler.computeOperation(writer);
+    Map<String, List<ForwardIndexHandler.Operation>> operationMap = fwdIndexHandler.computeOperations(writer);
     assertEquals(operationMap, Collections.EMPTY_MAP);
 
     // Tear down
@@ -506,20 +560,22 @@ public class ForwardIndexHandlerTest {
     IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
     indexLoadingConfig.getNoDictionaryColumns().remove(DIM_ZSTANDARD_STRING);
     ForwardIndexHandler fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
-    Map<String, ForwardIndexHandler.Operation> operationMap = fwdIndexHandler.computeOperation(writer);
-    assertEquals(operationMap.get(DIM_ZSTANDARD_STRING), ForwardIndexHandler.Operation.ENABLE_DICTIONARY);
+    Map<String, List<ForwardIndexHandler.Operation>> operationMap = fwdIndexHandler.computeOperations(writer);
+    assertEquals(operationMap.get(DIM_ZSTANDARD_STRING),
+        Collections.singletonList(ForwardIndexHandler.Operation.ENABLE_DICTIONARY));
 
     // TEST2: Enable dictionary for an MV column.
     indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
     indexLoadingConfig.getNoDictionaryColumns().remove(DIM_MV_PASS_THROUGH_STRING);
     fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
-    operationMap = fwdIndexHandler.computeOperation(writer);
-    assertEquals(operationMap.get(DIM_MV_PASS_THROUGH_STRING), ForwardIndexHandler.Operation.ENABLE_DICTIONARY);
+    operationMap = fwdIndexHandler.computeOperations(writer);
+    assertEquals(operationMap.get(DIM_MV_PASS_THROUGH_STRING),
+        Collections.singletonList(ForwardIndexHandler.Operation.ENABLE_DICTIONARY));
 
     // TEST3: Enable dictionary for a dict column. Should be a No-op.
     indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
     fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
-    operationMap = fwdIndexHandler.computeOperation(writer);
+    operationMap = fwdIndexHandler.computeOperations(writer);
     assertEquals(operationMap, Collections.EMPTY_MAP);
 
     // TEST4: Add an additional text index. ForwardIndexHandler should be a No-Op.
@@ -527,7 +583,7 @@ public class ForwardIndexHandlerTest {
     indexLoadingConfig.getTextIndexColumns().add(DIM_DICT_INTEGER);
     indexLoadingConfig.getTextIndexColumns().add(DIM_LZ4_INTEGER);
     fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
-    operationMap = fwdIndexHandler.computeOperation(writer);
+    operationMap = fwdIndexHandler.computeOperations(writer);
     assertEquals(operationMap, Collections.EMPTY_MAP);
 
     // TEST5: Add text index and enable dictionary.
@@ -535,8 +591,9 @@ public class ForwardIndexHandlerTest {
     indexLoadingConfig.getRangeIndexColumns().add(METRIC_LZ4_INTEGER);
     indexLoadingConfig.getNoDictionaryColumns().remove(METRIC_LZ4_INTEGER);
     fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
-    operationMap = fwdIndexHandler.computeOperation(writer);
-    assertEquals(operationMap.get(METRIC_LZ4_INTEGER), ForwardIndexHandler.Operation.ENABLE_DICTIONARY);
+    operationMap = fwdIndexHandler.computeOperations(writer);
+    assertEquals(operationMap.get(METRIC_LZ4_INTEGER),
+        Collections.singletonList(ForwardIndexHandler.Operation.ENABLE_DICTIONARY));
 
     // Tear down
     segmentLocalFSDirectory.close();
@@ -554,30 +611,32 @@ public class ForwardIndexHandlerTest {
     IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
     indexLoadingConfig.getNoDictionaryColumns().add(DIM_SNAPPY_INTEGER);
     ForwardIndexHandler fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
-    Map<String, ForwardIndexHandler.Operation> operationMap = fwdIndexHandler.computeOperation(writer);
+    Map<String, List<ForwardIndexHandler.Operation>> operationMap = fwdIndexHandler.computeOperations(writer);
     assertEquals(operationMap, Collections.EMPTY_MAP);
 
     // TEST2: Disable dictionary for a dictionary SV column.
     indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
     indexLoadingConfig.getNoDictionaryColumns().add(DIM_DICT_INTEGER);
     fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
-    operationMap = fwdIndexHandler.computeOperation(writer);
-    assertEquals(operationMap.get(DIM_DICT_INTEGER), ForwardIndexHandler.Operation.DISABLE_DICTIONARY);
+    operationMap = fwdIndexHandler.computeOperations(writer);
+    assertEquals(operationMap.get(DIM_DICT_INTEGER),
+        Collections.singletonList(ForwardIndexHandler.Operation.DISABLE_DICTIONARY));
 
     // TEST3: Disable dictionary for a dictionary MV column.
     indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
     indexLoadingConfig.getNoDictionaryColumns().add(DIM_DICT_MV_BYTES);
     fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
-    operationMap = fwdIndexHandler.computeOperation(writer);
-    assertEquals(operationMap.get(DIM_DICT_MV_BYTES), ForwardIndexHandler.Operation.DISABLE_DICTIONARY);
+    operationMap = fwdIndexHandler.computeOperations(writer);
+    assertEquals(operationMap.get(DIM_DICT_MV_BYTES),
+        Collections.singletonList(ForwardIndexHandler.Operation.DISABLE_DICTIONARY));
 
     // TEST4: Disable dictionary and enable inverted index. Should be a no-op.
     indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
     indexLoadingConfig.getNoDictionaryColumns().add(DIM_DICT_STRING);
     indexLoadingConfig.getInvertedIndexColumns().add(DIM_DICT_STRING);
     fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
-    operationMap = fwdIndexHandler.computeOperation(writer);
-    assertEquals(operationMap, Collections.emptyMap());
+    operationMap = fwdIndexHandler.computeOperations(writer);
+    assertEquals(operationMap, Collections.EMPTY_MAP);
 
     // Tear down
     segmentLocalFSDirectory.close();
@@ -603,7 +662,10 @@ public class ForwardIndexHandlerTest {
       randIdx = rand.nextInt(fieldConfigs.size());
       name = fieldConfigs.get(randIdx).getName();
     } while (SV_FORWARD_INDEX_DISABLED_COLUMNS.contains(name) || MV_FORWARD_INDEX_DISABLED_COLUMNS.contains(name)
-        || MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.contains(name));
+        || MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.contains(name)
+        || FORWARD_INDEX_DISABLED_RAW_COLUMNS.contains(name)
+        || DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX.equals(name)
+        || DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX.equals(name));
     FieldConfig config = fieldConfigs.remove(randIdx);
     FieldConfig.CompressionCodec newCompressionType = null;
     for (FieldConfig.CompressionCodec type : _allCompressionTypes) {
@@ -623,9 +685,10 @@ public class ForwardIndexHandlerTest {
 
     IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, tableConfig);
     ForwardIndexHandler fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, null);
-    Map<String, ForwardIndexHandler.Operation> operationMap = fwdIndexHandler.computeOperation(writer);
+    Map<String, List<ForwardIndexHandler.Operation>> operationMap = fwdIndexHandler.computeOperations(writer);
     assertEquals(operationMap.size(), 1);
-    assertEquals(operationMap.get(config.getName()), ForwardIndexHandler.Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE);
+    assertEquals(operationMap.get(config.getName()),
+        Collections.singletonList(ForwardIndexHandler.Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE));
 
     // TEST2: Change compression and add index. Change compressionType for more than 1 column.
     fieldConfigs = new ArrayList<>(_tableConfig.getFieldConfigList());
@@ -648,10 +711,12 @@ public class ForwardIndexHandlerTest {
     indexLoadingConfig.getTextIndexColumns().add(config1.getName());
     indexLoadingConfig.getInvertedIndexColumns().add(config1.getName());
     fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, null);
-    operationMap = fwdIndexHandler.computeOperation(writer);
+    operationMap = fwdIndexHandler.computeOperations(writer);
     assertEquals(operationMap.size(), 2);
-    assertEquals(operationMap.get(config1.getName()), ForwardIndexHandler.Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE);
-    assertEquals(operationMap.get(config2.getName()), ForwardIndexHandler.Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE);
+    assertEquals(operationMap.get(config1.getName()),
+        Collections.singletonList(ForwardIndexHandler.Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE));
+    assertEquals(operationMap.get(config2.getName()),
+        Collections.singletonList(ForwardIndexHandler.Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE));
 
     // Tear down
     segmentLocalFSDirectory.close();
@@ -670,7 +735,7 @@ public class ForwardIndexHandlerTest {
     IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
     indexLoadingConfig.getForwardIndexDisabledColumns().add(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER);
     ForwardIndexHandler fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, null);
-    Map<String, ForwardIndexHandler.Operation> operationMap = fwdIndexHandler.computeOperation(writer);
+    Map<String, List<ForwardIndexHandler.Operation>> operationMap = fwdIndexHandler.computeOperations(writer);
     assertEquals(operationMap, Collections.EMPTY_MAP);
 
     // TEST2: Disable forward index for a dictionary column with forward index enabled
@@ -678,10 +743,10 @@ public class ForwardIndexHandlerTest {
     indexLoadingConfig.getForwardIndexDisabledColumns().add(DIM_DICT_INTEGER);
     indexLoadingConfig.getInvertedIndexColumns().add(DIM_DICT_INTEGER);
     fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
-    operationMap = fwdIndexHandler.computeOperation(writer);
+    operationMap = fwdIndexHandler.computeOperations(writer);
     assertEquals(operationMap.size(), 1);
     assertEquals(operationMap.get(DIM_DICT_INTEGER),
-        ForwardIndexHandler.Operation.DISABLE_FORWARD_INDEX_FOR_DICT_COLUMN);
+        Collections.singletonList(ForwardIndexHandler.Operation.DISABLE_FORWARD_INDEX));
 
     // TEST3: Disable forward index for a raw column with forward index enabled and enable inverted index and
     // dictionary
@@ -690,9 +755,14 @@ public class ForwardIndexHandlerTest {
     indexLoadingConfig.getNoDictionaryColumns().remove(DIM_LZ4_INTEGER);
     indexLoadingConfig.getInvertedIndexColumns().add(DIM_LZ4_INTEGER);
     fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
-    operationMap = fwdIndexHandler.computeOperation(writer);
+    operationMap = fwdIndexHandler.computeOperations(writer);
     assertEquals(operationMap.size(), 1);
-    assertEquals(operationMap.get(DIM_LZ4_INTEGER), ForwardIndexHandler.Operation.DISABLE_FORWARD_INDEX_FOR_RAW_COLUMN);
+    Set<ForwardIndexHandler.Operation> operations = new HashSet<>(operationMap.get(DIM_LZ4_INTEGER));
+    assertEquals(operations.size(), 2);
+    Set<ForwardIndexHandler.Operation> expectedOperations =
+        new HashSet<>(Arrays.asList(ForwardIndexHandler.Operation.DISABLE_FORWARD_INDEX,
+            ForwardIndexHandler.Operation.ENABLE_DICTIONARY));
+    assertEquals(expectedOperations, operations);
 
     // TEST4: Disable forward index for two dictionary columns with forward index enabled
     indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
@@ -701,11 +771,12 @@ public class ForwardIndexHandlerTest {
     indexLoadingConfig.getInvertedIndexColumns().add(DIM_DICT_LONG);
     indexLoadingConfig.getInvertedIndexColumns().add(DIM_DICT_STRING);
     fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
-    operationMap = fwdIndexHandler.computeOperation(writer);
+    operationMap = fwdIndexHandler.computeOperations(writer);
     assertEquals(operationMap.size(), 2);
-    assertEquals(operationMap.get(DIM_DICT_LONG), ForwardIndexHandler.Operation.DISABLE_FORWARD_INDEX_FOR_DICT_COLUMN);
+    assertEquals(operationMap.get(DIM_DICT_LONG),
+        Collections.singletonList(ForwardIndexHandler.Operation.DISABLE_FORWARD_INDEX));
     assertEquals(operationMap.get(DIM_DICT_STRING),
-        ForwardIndexHandler.Operation.DISABLE_FORWARD_INDEX_FOR_DICT_COLUMN);
+        Collections.singletonList(ForwardIndexHandler.Operation.DISABLE_FORWARD_INDEX));
 
     // TEST5: Disable forward index for two raw columns with forward index enabled and enable dictionary
     indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
@@ -716,11 +787,16 @@ public class ForwardIndexHandlerTest {
     indexLoadingConfig.getNoDictionaryColumns().remove(DIM_SNAPPY_STRING);
     indexLoadingConfig.getInvertedIndexColumns().add(DIM_SNAPPY_STRING);
     fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
-    operationMap = fwdIndexHandler.computeOperation(writer);
+    operationMap = fwdIndexHandler.computeOperations(writer);
     assertEquals(operationMap.size(), 2);
-    assertEquals(operationMap.get(DIM_LZ4_LONG), ForwardIndexHandler.Operation.DISABLE_FORWARD_INDEX_FOR_RAW_COLUMN);
-    assertEquals(operationMap.get(DIM_SNAPPY_STRING),
-        ForwardIndexHandler.Operation.DISABLE_FORWARD_INDEX_FOR_RAW_COLUMN);
+    operations = new HashSet<>(operationMap.get(DIM_LZ4_LONG));
+    assertEquals(operations.size(), 2);
+    expectedOperations = new HashSet<>(Arrays.asList(ForwardIndexHandler.Operation.DISABLE_FORWARD_INDEX,
+            ForwardIndexHandler.Operation.ENABLE_DICTIONARY));
+    assertEquals(expectedOperations, operations);
+    operations = new HashSet<>(operationMap.get(DIM_SNAPPY_STRING));
+    assertEquals(operations.size(), 2);
+    assertEquals(expectedOperations, operations);
 
     // TEST6: Disable forward index for a dictionary and a raw column with forward index enabled
     indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
@@ -730,12 +806,111 @@ public class ForwardIndexHandlerTest {
     indexLoadingConfig.getInvertedIndexColumns().add(DIM_DICT_STRING);
     indexLoadingConfig.getForwardIndexDisabledColumns().add(DIM_DICT_STRING);
     fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
-    operationMap = fwdIndexHandler.computeOperation(writer);
+    operationMap = fwdIndexHandler.computeOperations(writer);
     assertEquals(operationMap.size(), 2);
-    assertEquals(operationMap.get(DIM_ZSTANDARD_INTEGER),
-        ForwardIndexHandler.Operation.DISABLE_FORWARD_INDEX_FOR_RAW_COLUMN);
+    operations = new HashSet<>(operationMap.get(DIM_ZSTANDARD_INTEGER));
+    assertEquals(operations.size(), 2);
+    expectedOperations = new HashSet<>(Arrays.asList(ForwardIndexHandler.Operation.DISABLE_FORWARD_INDEX,
+        ForwardIndexHandler.Operation.ENABLE_DICTIONARY));
+    assertEquals(expectedOperations, operations);
     assertEquals(operationMap.get(DIM_DICT_STRING),
-        ForwardIndexHandler.Operation.DISABLE_FORWARD_INDEX_FOR_DICT_COLUMN);
+        Collections.singletonList(ForwardIndexHandler.Operation.DISABLE_FORWARD_INDEX));
+
+    // TEST7: Disable forward index for a raw column without enabling dictionary or inverted index
+    indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+    indexLoadingConfig.getForwardIndexDisabledColumns().add(DIM_LZ4_INTEGER);
+    fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
+    operationMap = fwdIndexHandler.computeOperations(writer);
+    assertEquals(operationMap.size(), 1);
+    assertEquals(operationMap.get(DIM_LZ4_INTEGER),
+        Collections.singletonList(ForwardIndexHandler.Operation.DISABLE_FORWARD_INDEX));
+
+    // TEST8: Disable forward index for a dictionary column and also disable dictionary and inverted index
+    indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+    indexLoadingConfig.getForwardIndexDisabledColumns().add(DIM_DICT_INTEGER);
+    indexLoadingConfig.getNoDictionaryColumns().add(DIM_DICT_INTEGER);
+    fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
+    operationMap = fwdIndexHandler.computeOperations(writer);
+    assertEquals(operationMap.size(), 1);
+    operations = new HashSet<>(operationMap.get(DIM_DICT_INTEGER));
+    assertEquals(operations.size(), 2);
+    expectedOperations = new HashSet<>(Arrays.asList(ForwardIndexHandler.Operation.DISABLE_FORWARD_INDEX,
+        ForwardIndexHandler.Operation.DISABLE_DICTIONARY));
+    assertEquals(expectedOperations, operations);
+
+    // TEST9: Disable dictionary on a column that already has forward index disabled
+    indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+    indexLoadingConfig.getForwardIndexDisabledColumns().add(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER);
+    indexLoadingConfig.getNoDictionaryColumns().add(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER);
+    indexLoadingConfig.getInvertedIndexColumns().remove(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER);
+    fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, null);
+    operationMap = fwdIndexHandler.computeOperations(writer);
+    assertEquals(operationMap.size(), 1);
+    assertEquals(operationMap.get(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER),
+        Collections.singletonList(ForwardIndexHandler.Operation.DISABLE_DICTIONARY));
+
+    // TEST10: Disable inverted index on a column that already has forward index disabled
+    indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+    indexLoadingConfig.getForwardIndexDisabledColumns().add(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER);
+    indexLoadingConfig.getInvertedIndexColumns().remove(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER);
+    fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, null);
+    operationMap = fwdIndexHandler.computeOperations(writer);
+    assertEquals(operationMap, Collections.EMPTY_MAP);
+
+    // TEST11: Disable dictionary on a column that already has forward index disabled and inverted index disabled
+    indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+    indexLoadingConfig.getNoDictionaryColumns().add(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX);
+    fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, null);
+    operationMap = fwdIndexHandler.computeOperations(writer);
+    assertEquals(operationMap.size(), 1);
+    assertEquals(operationMap.get(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX),
+        Collections.singletonList(ForwardIndexHandler.Operation.DISABLE_DICTIONARY));
+
+    // TEST12: Enable dictionary on a column that already has forward index disabled and dictionary disabled
+    indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+    indexLoadingConfig.getForwardIndexDisabledColumns().add(DIM_RAW_SV_FORWARD_INDEX_DISABLED_INTEGER);
+    indexLoadingConfig.getNoDictionaryColumns().remove(DIM_RAW_SV_FORWARD_INDEX_DISABLED_INTEGER);
+    fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, null);
+    try {
+      operationMap = fwdIndexHandler.computeOperations(writer);
+      Assert.fail("Enabling dictionary on forward index disabled column is not possible");
+    } catch (IllegalStateException e) {
+      assertEquals(e.getMessage(), "Cannot regenerate the dictionary for column "
+          + "DIM_RAW_SV_FORWARD_INDEX_DISABLED_INTEGER with forward index disabled. Please refresh or back-fill "
+          + "the data to add back the forward index");
+    }
+
+    // TEST13: Disable dictionary on a column that already has forward index disabled without an inverted index but
+    // with a range index
+    indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+    indexLoadingConfig.getForwardIndexDisabledColumns().add(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX);
+    indexLoadingConfig.getNoDictionaryColumns().add(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX);
+    fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, null);
+    try {
+      operationMap = fwdIndexHandler.computeOperations(writer);
+      Assert.fail("Disabling dictionary on forward index disabled column without inverted index but which has a "
+          + "range index is not possible");
+    } catch (IllegalStateException e) {
+      assertEquals(e.getMessage(), "Must disable range (enabled) index to disable the dictionary for a "
+          + "forwardIndexDisabled column: DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX or refresh / "
+          + "back-fill the forward index");
+    }
+
+    // TEST13: Disable dictionary on a column that already has forward index disabled and inverted index enabled with
+    // a range index
+    indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+    indexLoadingConfig.getNoDictionaryColumns().add(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER);
+    indexLoadingConfig.getRangeIndexColumns().add(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER);
+    fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, null);
+    try {
+      operationMap = fwdIndexHandler.computeOperations(writer);
+      Assert.fail("Disabling dictionary on forward index disabled column with inverted index and a range index "
+          + "is not possible");
+    } catch (IllegalStateException e) {
+      assertEquals(e.getMessage(), "Must disable range (enabled) index to disable the dictionary for a "
+          + "forwardIndexDisabled column: DIM_SV_FORWARD_INDEX_DISABLED_INTEGER or refresh / back-fill the "
+          + "forward index");
+    }
   }
 
   @Test
@@ -774,26 +949,26 @@ public class ForwardIndexHandlerTest {
     noDictionaryColumns.add(config.getName());
     TableConfig tableConfig =
         new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setNoDictionaryColumns(noDictionaryColumns)
-            .setInvertedIndexColumns(_forwardIndexDisabledColumns).setFieldConfigList(fieldConfigs).build();
+            .setInvertedIndexColumns(_invertedIndexColumns).setFieldConfigList(fieldConfigs).build();
     tableConfig.setFieldConfigList(fieldConfigs);
 
     IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, tableConfig);
     indexLoadingConfig.getNoDictionaryColumns().add(config.getName());
     indexLoadingConfig.getInvertedIndexColumns().remove(config.getName());
     ForwardIndexHandler fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, null);
-    Map<String, ForwardIndexHandler.Operation> operationMap = fwdIndexHandler.computeOperation(writer);
+    Map<String, List<ForwardIndexHandler.Operation>> operationMap = fwdIndexHandler.computeOperations(writer);
     assertEquals(operationMap.size(), 1);
     assertEquals(operationMap.get(config.getName()),
-        ForwardIndexHandler.Operation.ENABLE_FORWARD_INDEX_FOR_RAW_COLUMN);
+        Collections.singletonList(ForwardIndexHandler.Operation.ENABLE_FORWARD_INDEX));
 
     // TEST2: Enable forward index in dictionary format for a column with forward index disabled
     indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
     indexLoadingConfig.getForwardIndexDisabledColumns().remove(DIM_SV_FORWARD_INDEX_DISABLED_BYTES);
     fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
-    operationMap = fwdIndexHandler.computeOperation(writer);
+    operationMap = fwdIndexHandler.computeOperations(writer);
     assertEquals(operationMap.size(), 1);
     assertEquals(operationMap.get(DIM_SV_FORWARD_INDEX_DISABLED_BYTES),
-        ForwardIndexHandler.Operation.ENABLE_FORWARD_INDEX_FOR_DICT_COLUMN);
+        Collections.singletonList(ForwardIndexHandler.Operation.ENABLE_FORWARD_INDEX));
 
     // TEST3: Enable forward index in raw format for a column with forward index disabled. Remove column from inverted
     // index as well (inverted index needs dictionary)
@@ -802,10 +977,10 @@ public class ForwardIndexHandlerTest {
     indexLoadingConfig.getNoDictionaryColumns().add(DIM_MV_FORWARD_INDEX_DISABLED_INTEGER);
     indexLoadingConfig.getInvertedIndexColumns().remove(DIM_MV_FORWARD_INDEX_DISABLED_INTEGER);
     fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
-    operationMap = fwdIndexHandler.computeOperation(writer);
+    operationMap = fwdIndexHandler.computeOperations(writer);
     assertEquals(operationMap.size(), 1);
     assertEquals(operationMap.get(DIM_MV_FORWARD_INDEX_DISABLED_INTEGER),
-        ForwardIndexHandler.Operation.ENABLE_FORWARD_INDEX_FOR_RAW_COLUMN);
+        Collections.singletonList(ForwardIndexHandler.Operation.ENABLE_FORWARD_INDEX));
 
     // TEST4: Enable forward index in dictionary format for two columns with forward index disabled. Disable inverted
     // index for one of them
@@ -814,12 +989,12 @@ public class ForwardIndexHandlerTest {
     indexLoadingConfig.getForwardIndexDisabledColumns().remove(DIM_MV_FORWARD_INDEX_DISABLED_STRING);
     indexLoadingConfig.getInvertedIndexColumns().remove(DIM_SV_FORWARD_INDEX_DISABLED_LONG);
     fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
-    operationMap = fwdIndexHandler.computeOperation(writer);
+    operationMap = fwdIndexHandler.computeOperations(writer);
     assertEquals(operationMap.size(), 2);
     assertEquals(operationMap.get(DIM_SV_FORWARD_INDEX_DISABLED_LONG),
-        ForwardIndexHandler.Operation.ENABLE_FORWARD_INDEX_FOR_DICT_COLUMN);
+        Collections.singletonList(ForwardIndexHandler.Operation.ENABLE_FORWARD_INDEX));
     assertEquals(operationMap.get(DIM_MV_FORWARD_INDEX_DISABLED_STRING),
-        ForwardIndexHandler.Operation.ENABLE_FORWARD_INDEX_FOR_DICT_COLUMN);
+        Collections.singletonList(ForwardIndexHandler.Operation.ENABLE_FORWARD_INDEX));
 
     // TEST5: Enable forward index in raw format for two columns with forward index disabled. Remove column from
     // inverted index as well (inverted index needs dictionary)
@@ -831,12 +1006,12 @@ public class ForwardIndexHandlerTest {
     indexLoadingConfig.getNoDictionaryColumns().add(DIM_MV_FORWARD_INDEX_DISABLED_LONG);
     indexLoadingConfig.getInvertedIndexColumns().remove(DIM_MV_FORWARD_INDEX_DISABLED_LONG);
     fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
-    operationMap = fwdIndexHandler.computeOperation(writer);
+    operationMap = fwdIndexHandler.computeOperations(writer);
     assertEquals(operationMap.size(), 2);
     assertEquals(operationMap.get(DIM_SV_FORWARD_INDEX_DISABLED_STRING),
-        ForwardIndexHandler.Operation.ENABLE_FORWARD_INDEX_FOR_RAW_COLUMN);
+        Collections.singletonList(ForwardIndexHandler.Operation.ENABLE_FORWARD_INDEX));
     assertEquals(operationMap.get(DIM_MV_FORWARD_INDEX_DISABLED_LONG),
-        ForwardIndexHandler.Operation.ENABLE_FORWARD_INDEX_FOR_RAW_COLUMN);
+        Collections.singletonList(ForwardIndexHandler.Operation.ENABLE_FORWARD_INDEX));
 
     // TEST6: Enable forward index in dictionary format and one in raw format for columns with forward index disabled
     indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
@@ -845,12 +1020,26 @@ public class ForwardIndexHandlerTest {
     indexLoadingConfig.getInvertedIndexColumns().remove(DIM_MV_FORWARD_INDEX_DISABLED_LONG);
     indexLoadingConfig.getForwardIndexDisabledColumns().remove(DIM_SV_FORWARD_INDEX_DISABLED_BYTES);
     fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
-    operationMap = fwdIndexHandler.computeOperation(writer);
+    operationMap = fwdIndexHandler.computeOperations(writer);
     assertEquals(operationMap.size(), 2);
     assertEquals(operationMap.get(DIM_MV_FORWARD_INDEX_DISABLED_LONG),
-        ForwardIndexHandler.Operation.ENABLE_FORWARD_INDEX_FOR_RAW_COLUMN);
+        Collections.singletonList(ForwardIndexHandler.Operation.ENABLE_FORWARD_INDEX));
     assertEquals(operationMap.get(DIM_SV_FORWARD_INDEX_DISABLED_BYTES),
-        ForwardIndexHandler.Operation.ENABLE_FORWARD_INDEX_FOR_DICT_COLUMN);
+        Collections.singletonList(ForwardIndexHandler.Operation.ENABLE_FORWARD_INDEX));
+
+    // TEST7: Enable forward index for a raw column with forward index disabled and keep it as raw
+    indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+    indexLoadingConfig.getForwardIndexDisabledColumns().remove(DIM_RAW_MV_FORWARD_INDEX_DISABLED_INTEGER);
+    fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
+    operationMap = fwdIndexHandler.computeOperations(writer);
+    assertEquals(operationMap, Collections.EMPTY_MAP);
+
+    // TEST8: Enable forward index for a dictionary based column with forward index and inverted index disabled
+    indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+    indexLoadingConfig.getForwardIndexDisabledColumns().remove(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX);
+    fwdIndexHandler = new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
+    operationMap = fwdIndexHandler.computeOperations(writer);
+    assertEquals(operationMap, Collections.EMPTY_MAP);
 
     // Tear down
     segmentLocalFSDirectory.close();
@@ -860,6 +1049,10 @@ public class ForwardIndexHandlerTest {
   public void testChangeCompressionForSingleColumn()
       throws Exception {
     for (int i = 0; i < _noDictionaryColumns.size(); i++) {
+      if (FORWARD_INDEX_DISABLED_RAW_COLUMNS.contains(_noDictionaryColumns.get(i))) {
+        // Don't run this test for forward index disabled columns
+        continue;
+      }
       // For every noDictionaryColumn, change the compressionType to all available types, one by one.
       for (FieldConfig.CompressionCodec compressionType : _allCompressionTypes) {
         // Setup
@@ -938,7 +1131,10 @@ public class ForwardIndexHandlerTest {
       randomIdx = rand.nextInt(fieldConfigs.size());
       name = fieldConfigs.get(randomIdx).getName();
     } while (SV_FORWARD_INDEX_DISABLED_COLUMNS.contains(name) || MV_FORWARD_INDEX_DISABLED_COLUMNS.contains(name)
-        || MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.contains(name));
+        || MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.contains(name)
+        || FORWARD_INDEX_DISABLED_RAW_COLUMNS.contains(name)
+        || DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX.equals(name)
+        || DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX.equals(name));
     FieldConfig config1 = fieldConfigs.remove(randomIdx);
     String column1 = config1.getName();
     FieldConfig newConfig1 =
@@ -951,7 +1147,10 @@ public class ForwardIndexHandlerTest {
       randomIdx = rand.nextInt(fieldConfigs.size());
       name = fieldConfigs.get(randomIdx).getName();
     } while (SV_FORWARD_INDEX_DISABLED_COLUMNS.contains(name) || MV_FORWARD_INDEX_DISABLED_COLUMNS.contains(name)
-        || MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.contains(name));
+        || MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS.contains(name)
+        || FORWARD_INDEX_DISABLED_RAW_COLUMNS.contains(name)
+        || DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX.equals(name)
+        || DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX.equals(name));
     FieldConfig config2 = fieldConfigs.remove(randomIdx);
     String column2 = config2.getName();
     FieldConfig newConfig2 =
@@ -1065,6 +1264,10 @@ public class ForwardIndexHandlerTest {
     IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
 
     for (int i = 0; i < _noDictionaryColumns.size(); i++) {
+      if (FORWARD_INDEX_DISABLED_RAW_COLUMNS.contains(_noDictionaryColumns.get(i))) {
+        // Skip the RAW forward index disabled columns
+        continue;
+      }
       SegmentMetadataImpl existingSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
       SegmentDirectory segmentLocalFSDirectory =
           new SegmentLocalFSDirectory(_segmentDirectory, existingSegmentMetadata, ReadMode.mmap);
@@ -1159,6 +1362,9 @@ public class ForwardIndexHandlerTest {
     Set<String> forwardIndexDisabledColumns = new HashSet<>(SV_FORWARD_INDEX_DISABLED_COLUMNS);
     forwardIndexDisabledColumns.addAll(MV_FORWARD_INDEX_DISABLED_COLUMNS);
     forwardIndexDisabledColumns.addAll(MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS);
+    forwardIndexDisabledColumns.addAll(FORWARD_INDEX_DISABLED_RAW_COLUMNS);
+    forwardIndexDisabledColumns.add(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX);
+    forwardIndexDisabledColumns.add(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX);
     for (String column : DICT_ENABLED_COLUMNS_WITH_FORWARD_INDEX) {
       SegmentMetadataImpl existingSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
       SegmentDirectory segmentLocalFSDirectory =
@@ -1168,7 +1374,11 @@ public class ForwardIndexHandlerTest {
       IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
       forwardIndexDisabledColumns.add(column);
       indexLoadingConfig.setForwardIndexDisabledColumns(forwardIndexDisabledColumns);
-      indexLoadingConfig.setInvertedIndexColumns(forwardIndexDisabledColumns);
+      Set<String> invertedIndexColumns = new HashSet<>(forwardIndexDisabledColumns);
+      invertedIndexColumns.removeAll(FORWARD_INDEX_DISABLED_RAW_COLUMNS);
+      invertedIndexColumns.remove(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX);
+      invertedIndexColumns.remove(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX);
+      indexLoadingConfig.setInvertedIndexColumns(invertedIndexColumns);
       ForwardIndexHandler fwdIndexHandler =
           new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
       IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider();
@@ -1352,7 +1562,14 @@ public class ForwardIndexHandlerTest {
     Set<String> forwardIndexDisabledColumns = new HashSet<>(SV_FORWARD_INDEX_DISABLED_COLUMNS);
     forwardIndexDisabledColumns.addAll(MV_FORWARD_INDEX_DISABLED_COLUMNS);
     forwardIndexDisabledColumns.addAll(MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS);
+    forwardIndexDisabledColumns.addAll(FORWARD_INDEX_DISABLED_RAW_COLUMNS);
+    forwardIndexDisabledColumns.add(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX);
+    forwardIndexDisabledColumns.add(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX);
     for (String column : _noDictionaryColumns) {
+      if (FORWARD_INDEX_DISABLED_RAW_COLUMNS.contains(column)) {
+        // Forward index already disabled for these columns, skip them
+        continue;
+      }
       SegmentMetadataImpl existingSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
       SegmentDirectory segmentLocalFSDirectory =
           new SegmentLocalFSDirectory(_segmentDirectory, existingSegmentMetadata, ReadMode.mmap);
@@ -1362,7 +1579,12 @@ public class ForwardIndexHandlerTest {
       forwardIndexDisabledColumns.add(column);
       indexLoadingConfig.setForwardIndexDisabledColumns(forwardIndexDisabledColumns);
       indexLoadingConfig.getNoDictionaryColumns().removeAll(forwardIndexDisabledColumns);
-      indexLoadingConfig.setInvertedIndexColumns(forwardIndexDisabledColumns);
+      indexLoadingConfig.getNoDictionaryColumns().addAll(FORWARD_INDEX_DISABLED_RAW_COLUMNS);
+      Set<String> invertedIndexColumns = new HashSet<>(forwardIndexDisabledColumns);
+      invertedIndexColumns.removeAll(FORWARD_INDEX_DISABLED_RAW_COLUMNS);
+      invertedIndexColumns.remove(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX);
+      invertedIndexColumns.remove(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX);
+      indexLoadingConfig.setInvertedIndexColumns(invertedIndexColumns);
       ForwardIndexHandler fwdIndexHandler =
           new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
       IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider();
@@ -1392,6 +1614,117 @@ public class ForwardIndexHandlerTest {
     }
   }
 
+  @Test
+  public void testDisableForwardIndexForRawAndInvertedIndexDisabledColumns()
+      throws Exception {
+    Set<String> forwardIndexDisabledColumns = new HashSet<>(SV_FORWARD_INDEX_DISABLED_COLUMNS);
+    forwardIndexDisabledColumns.addAll(MV_FORWARD_INDEX_DISABLED_COLUMNS);
+    forwardIndexDisabledColumns.addAll(MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS);
+    forwardIndexDisabledColumns.addAll(FORWARD_INDEX_DISABLED_RAW_COLUMNS);
+    forwardIndexDisabledColumns.add(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX);
+    forwardIndexDisabledColumns.add(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX);
+    for (String column : _noDictionaryColumns) {
+      if (FORWARD_INDEX_DISABLED_RAW_COLUMNS.contains(column)) {
+        // Forward index already disabled for these columns, skip them
+        continue;
+      }
+      SegmentMetadataImpl existingSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
+      SegmentDirectory segmentLocalFSDirectory =
+          new SegmentLocalFSDirectory(_segmentDirectory, existingSegmentMetadata, ReadMode.mmap);
+      SegmentDirectory.Writer writer = segmentLocalFSDirectory.createWriter();
+
+      IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+      forwardIndexDisabledColumns.add(column);
+      indexLoadingConfig.setForwardIndexDisabledColumns(forwardIndexDisabledColumns);
+      Set<String> invertedIndexColumns = new HashSet<>(forwardIndexDisabledColumns);
+      invertedIndexColumns.removeAll(FORWARD_INDEX_DISABLED_RAW_COLUMNS);
+      invertedIndexColumns.remove(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX);
+      invertedIndexColumns.remove(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX);
+      invertedIndexColumns.remove(column);
+      indexLoadingConfig.setInvertedIndexColumns(invertedIndexColumns);
+      ForwardIndexHandler fwdIndexHandler =
+          new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
+      IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider();
+      fwdIndexHandler.updateIndices(writer, indexCreatorProvider);
+      fwdIndexHandler.postUpdateIndicesCleanup(writer);
+
+      // Tear down before validation. Because columns.psf and index map cleanup happens at segmentDirectory.close()
+      segmentLocalFSDirectory.close();
+
+      validateIndexMap(column, false, true);
+      validateIndexesForForwardIndexDisabledColumns(column);
+
+      // In column metadata, nothing other than hasDictionary and dictionaryElementSize should change.
+      ColumnMetadata metadata = existingSegmentMetadata.getColumnMetadataFor(column);
+      FieldSpec.DataType dataType = metadata.getDataType();
+      validateMetadataProperties(column, false, 0, metadata.getCardinality(),
+          metadata.getTotalDocs(), dataType, metadata.getFieldType(), metadata.isSorted(), metadata.isSingleValue(),
+          metadata.getMaxNumberOfMultiValues(), metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(),
+          metadata.getMinValue(), metadata.getMaxValue(), false);
+    }
+  }
+
+  @Test
+  public void testDisableForwardIndexForInvertedIndexDisabledColumns()
+      throws Exception {
+    Set<String> forwardIndexDisabledColumns = new HashSet<>(SV_FORWARD_INDEX_DISABLED_COLUMNS);
+    forwardIndexDisabledColumns.addAll(MV_FORWARD_INDEX_DISABLED_COLUMNS);
+    forwardIndexDisabledColumns.addAll(MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS);
+    forwardIndexDisabledColumns.addAll(FORWARD_INDEX_DISABLED_RAW_COLUMNS);
+    forwardIndexDisabledColumns.add(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX);
+    forwardIndexDisabledColumns.add(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX);
+    Set<String> noDictColumnsToRemove = new HashSet<>();
+    noDictColumnsToRemove.add(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX);
+    for (String column : _noDictionaryColumns) {
+      if (FORWARD_INDEX_DISABLED_RAW_COLUMNS.contains(column)) {
+        // Forward index already disabled for these columns, skip them
+        continue;
+      }
+      SegmentMetadataImpl existingSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
+      SegmentDirectory segmentLocalFSDirectory =
+          new SegmentLocalFSDirectory(_segmentDirectory, existingSegmentMetadata, ReadMode.mmap);
+      SegmentDirectory.Writer writer = segmentLocalFSDirectory.createWriter();
+
+      IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+      forwardIndexDisabledColumns.add(column);
+      indexLoadingConfig.setForwardIndexDisabledColumns(forwardIndexDisabledColumns);
+      noDictColumnsToRemove.add(column);
+      indexLoadingConfig.getNoDictionaryColumns().removeAll(noDictColumnsToRemove);
+      Set<String> invertedIndexColumns = new HashSet<>(forwardIndexDisabledColumns);
+      invertedIndexColumns.removeAll(FORWARD_INDEX_DISABLED_RAW_COLUMNS);
+      invertedIndexColumns.remove(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX);
+      invertedIndexColumns.remove(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX);
+      invertedIndexColumns.remove(column);
+      indexLoadingConfig.setInvertedIndexColumns(invertedIndexColumns);
+      ForwardIndexHandler fwdIndexHandler =
+          new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
+      IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider();
+      fwdIndexHandler.updateIndices(writer, indexCreatorProvider);
+      fwdIndexHandler.postUpdateIndicesCleanup(writer);
+
+      // Tear down before validation. Because columns.psf and index map cleanup happens at segmentDirectory.close()
+      segmentLocalFSDirectory.close();
+
+      validateIndexMap(column, true, true);
+      validateIndexesForForwardIndexDisabledColumns(column);
+
+      // In column metadata, nothing other than hasDictionary and dictionaryElementSize should change.
+      ColumnMetadata metadata = existingSegmentMetadata.getColumnMetadataFor(column);
+      FieldSpec.DataType dataType = metadata.getDataType();
+      int dictionaryElementSize = 0;
+      if (dataType == FieldSpec.DataType.STRING || dataType == FieldSpec.DataType.BYTES) {
+        // This value is based on the rows in createTestData().
+        dictionaryElementSize = 7;
+      } else if (dataType == FieldSpec.DataType.BIG_DECIMAL) {
+        dictionaryElementSize = 4;
+      }
+      validateMetadataProperties(column, true, dictionaryElementSize, metadata.getCardinality(),
+          metadata.getTotalDocs(), dataType, metadata.getFieldType(), metadata.isSorted(), metadata.isSingleValue(),
+          metadata.getMaxNumberOfMultiValues(), metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(),
+          metadata.getMinValue(), metadata.getMaxValue(), false);
+    }
+  }
+
   @Test
   public void testEnableForwardIndexInDictModeForMultipleForwardIndexDisabledColumns()
       throws Exception {
@@ -1479,6 +1812,9 @@ public class ForwardIndexHandlerTest {
     Set<String> forwardIndexDisabledColumns = new HashSet<>(SV_FORWARD_INDEX_DISABLED_COLUMNS);
     forwardIndexDisabledColumns.addAll(MV_FORWARD_INDEX_DISABLED_COLUMNS);
     forwardIndexDisabledColumns.addAll(MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS);
+    forwardIndexDisabledColumns.addAll(FORWARD_INDEX_DISABLED_RAW_COLUMNS);
+    forwardIndexDisabledColumns.add(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX);
+    forwardIndexDisabledColumns.add(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX);
     List<String> allForwardIndexDisabledColumns = new ArrayList<>(SV_FORWARD_INDEX_DISABLED_COLUMNS);
     allForwardIndexDisabledColumns.addAll(MV_FORWARD_INDEX_DISABLED_COLUMNS);
     for (String column : allForwardIndexDisabledColumns) {
@@ -1604,6 +1940,9 @@ public class ForwardIndexHandlerTest {
     Set<String> forwardIndexDisabledColumns = new HashSet<>(SV_FORWARD_INDEX_DISABLED_COLUMNS);
     forwardIndexDisabledColumns.addAll(MV_FORWARD_INDEX_DISABLED_COLUMNS);
     forwardIndexDisabledColumns.addAll(MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS);
+    forwardIndexDisabledColumns.addAll(FORWARD_INDEX_DISABLED_RAW_COLUMNS);
+    forwardIndexDisabledColumns.add(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX);
+    forwardIndexDisabledColumns.add(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX);
     List<String> allForwardIndexDisabledColumns = new ArrayList<>(SV_FORWARD_INDEX_DISABLED_COLUMNS);
     allForwardIndexDisabledColumns.addAll(MV_FORWARD_INDEX_DISABLED_COLUMNS);
     List<String> columnList = new ArrayList<>();
@@ -1616,7 +1955,11 @@ public class ForwardIndexHandlerTest {
       IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
       forwardIndexDisabledColumns.remove(column);
       indexLoadingConfig.setForwardIndexDisabledColumns(forwardIndexDisabledColumns);
-      indexLoadingConfig.setInvertedIndexColumns(forwardIndexDisabledColumns);
+      Set<String> invertedIndexColumns = new HashSet<>(forwardIndexDisabledColumns);
+      invertedIndexColumns.removeAll(FORWARD_INDEX_DISABLED_RAW_COLUMNS);
+      invertedIndexColumns.remove(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX);
+      invertedIndexColumns.remove(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX);
+      indexLoadingConfig.setInvertedIndexColumns(invertedIndexColumns);
       columnList.add(column);
       indexLoadingConfig.getNoDictionaryColumns().addAll(columnList);
       ForwardIndexHandler fwdIndexHandler =
@@ -1641,6 +1984,118 @@ public class ForwardIndexHandlerTest {
     }
   }
 
+  @Test
+  public void testEnableForwardIndexForInvertedIndexDisabledColumn()
+      throws Exception {
+    Set<String> forwardIndexDisabledColumns = new HashSet<>(SV_FORWARD_INDEX_DISABLED_COLUMNS);
+    forwardIndexDisabledColumns.addAll(MV_FORWARD_INDEX_DISABLED_COLUMNS);
+    forwardIndexDisabledColumns.addAll(MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS);
+    forwardIndexDisabledColumns.addAll(FORWARD_INDEX_DISABLED_RAW_COLUMNS);
+    forwardIndexDisabledColumns.add(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX);
+    forwardIndexDisabledColumns.add(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX);
+
+    SegmentMetadataImpl existingSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
+    SegmentDirectory segmentLocalFSDirectory =
+        new SegmentLocalFSDirectory(_segmentDirectory, existingSegmentMetadata, ReadMode.mmap);
+    SegmentDirectory.Writer writer = segmentLocalFSDirectory.createWriter();
+
+    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+    forwardIndexDisabledColumns.remove(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX);
+    indexLoadingConfig.setForwardIndexDisabledColumns(forwardIndexDisabledColumns);
+    Set<String> invertedIndexColumns = new HashSet<>(forwardIndexDisabledColumns);
+    invertedIndexColumns.removeAll(FORWARD_INDEX_DISABLED_RAW_COLUMNS);
+    invertedIndexColumns.remove(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX);
+    invertedIndexColumns.remove(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX);
+    indexLoadingConfig.setInvertedIndexColumns(invertedIndexColumns);
+
+    validateIndexMap(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX, true, true);
+    validateIndexesForForwardIndexDisabledColumns(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX);
+
+    ForwardIndexHandler fwdIndexHandler =
+        new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
+    IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider();
+    fwdIndexHandler.updateIndices(writer, indexCreatorProvider);
+    fwdIndexHandler.postUpdateIndicesCleanup(writer);
+
+    // Tear down before validation. Because columns.psf and index map cleanup happens at segmentDirectory.close()
+    segmentLocalFSDirectory.close();
+
+    // Validate nothing has changed
+    validateIndexMap(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX, true, true);
+    validateIndexesForForwardIndexDisabledColumns(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX);
+
+    // In column metadata, nothing should change.
+    ColumnMetadata metadata =
+        existingSegmentMetadata.getColumnMetadataFor(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX);
+    validateMetadataProperties(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX, metadata.hasDictionary(),
+        metadata.getColumnMaxLength(), metadata.getCardinality(), metadata.getTotalDocs(), metadata.getDataType(),
+        metadata.getFieldType(), metadata.isSorted(), metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(),
+        metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), metadata.getMinValue(),
+        metadata.getMaxValue(), false);
+  }
+
+  @Test
+  public void testEnableForwardIndexForDictionaryDisabledColumns()
+      throws Exception {
+    Set<String> forwardIndexDisabledColumns = new HashSet<>(SV_FORWARD_INDEX_DISABLED_COLUMNS);
+    forwardIndexDisabledColumns.addAll(MV_FORWARD_INDEX_DISABLED_COLUMNS);
+    forwardIndexDisabledColumns.addAll(MV_FORWARD_INDEX_DISABLED_DUPLICATES_COLUMNS);
+    forwardIndexDisabledColumns.addAll(FORWARD_INDEX_DISABLED_RAW_COLUMNS);
+    forwardIndexDisabledColumns.add(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX);
+    forwardIndexDisabledColumns.add(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX);
+
+    SegmentMetadataImpl existingSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
+    SegmentDirectory segmentLocalFSDirectory =
+        new SegmentLocalFSDirectory(_segmentDirectory, existingSegmentMetadata, ReadMode.mmap);
+    SegmentDirectory.Writer writer = segmentLocalFSDirectory.createWriter();
+
+    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+    forwardIndexDisabledColumns.remove(DIM_RAW_SV_FORWARD_INDEX_DISABLED_INTEGER);
+    forwardIndexDisabledColumns.remove(DIM_RAW_MV_FORWARD_INDEX_DISABLED_INTEGER);
+    indexLoadingConfig.setForwardIndexDisabledColumns(forwardIndexDisabledColumns);
+    Set<String> invertedIndexColumns = new HashSet<>(forwardIndexDisabledColumns);
+    invertedIndexColumns.removeAll(FORWARD_INDEX_DISABLED_RAW_COLUMNS);
+    invertedIndexColumns.remove(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX);
+    invertedIndexColumns.remove(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITH_RANGE_INDEX);
+    indexLoadingConfig.setInvertedIndexColumns(invertedIndexColumns);
+
+    validateIndexMap(DIM_RAW_SV_FORWARD_INDEX_DISABLED_INTEGER, false, true);
+    validateIndexesForForwardIndexDisabledColumns(DIM_RAW_MV_FORWARD_INDEX_DISABLED_INTEGER);
+    validateIndexMap(DIM_RAW_SV_FORWARD_INDEX_DISABLED_INTEGER, false, true);
+    validateIndexesForForwardIndexDisabledColumns(DIM_RAW_MV_FORWARD_INDEX_DISABLED_INTEGER);
+
+    ForwardIndexHandler fwdIndexHandler =
+        new ForwardIndexHandler(segmentLocalFSDirectory, indexLoadingConfig, _schema);
+    IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider();
+    fwdIndexHandler.updateIndices(writer, indexCreatorProvider);
+    fwdIndexHandler.postUpdateIndicesCleanup(writer);
+
+    // Tear down before validation. Because columns.psf and index map cleanup happens at segmentDirectory.close()
+    segmentLocalFSDirectory.close();
+
+    // Validate nothing has changed
+    validateIndexMap(DIM_RAW_SV_FORWARD_INDEX_DISABLED_INTEGER, false, true);
+    validateIndexesForForwardIndexDisabledColumns(DIM_RAW_SV_FORWARD_INDEX_DISABLED_INTEGER);
+    validateIndexMap(DIM_RAW_MV_FORWARD_INDEX_DISABLED_INTEGER, false, true);
+    validateIndexesForForwardIndexDisabledColumns(DIM_RAW_MV_FORWARD_INDEX_DISABLED_INTEGER);
+
+    // In column metadata, nothing should change.
+    ColumnMetadata metadata =
+        existingSegmentMetadata.getColumnMetadataFor(DIM_RAW_SV_FORWARD_INDEX_DISABLED_INTEGER);
+    validateMetadataProperties(DIM_RAW_SV_FORWARD_INDEX_DISABLED_INTEGER, metadata.hasDictionary(),
+        metadata.getColumnMaxLength(), metadata.getCardinality(), metadata.getTotalDocs(), metadata.getDataType(),
+        metadata.getFieldType(), metadata.isSorted(), metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(),
+        metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), metadata.getMinValue(),
+        metadata.getMaxValue(), false);
+    metadata =
+        existingSegmentMetadata.getColumnMetadataFor(DIM_RAW_MV_FORWARD_INDEX_DISABLED_INTEGER);
+    validateMetadataProperties(DIM_RAW_MV_FORWARD_INDEX_DISABLED_INTEGER, metadata.hasDictionary(),
+        metadata.getColumnMaxLength(), metadata.getCardinality(), metadata.getTotalDocs(), metadata.getDataType(),
+        metadata.getFieldType(), metadata.isSorted(), metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(),
+        metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), metadata.getMinValue(),
+        metadata.getMaxValue(), false);
+  }
+
   @Test
   public void testAddOtherIndexForForwardIndexDisabledColumn()
       throws Exception {
@@ -1697,6 +2152,45 @@ public class ForwardIndexHandlerTest {
     assertNotEquals(metadata.getTotalNumberOfEntries(), columnMetadata.getTotalNumberOfEntries());
   }
 
+  @Test
+  public void testAddOtherIndexWhenForwardIndexDisabledAndInvertedIndexOrDictionaryDisabled()
+      throws IOException {
+    SegmentMetadataImpl existingSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
+    SegmentDirectory segmentLocalFSDirectory =
+        new SegmentLocalFSDirectory(_segmentDirectory, existingSegmentMetadata, ReadMode.mmap);
+    SegmentDirectory.Writer writer = segmentLocalFSDirectory.createWriter();
+    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig);
+
+    // Add a forward index and inverted index disabled column to the range index list
+    indexLoadingConfig.getRangeIndexColumns().add(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX);
+    RangeIndexHandler rangeIndexHandler = new RangeIndexHandler(segmentLocalFSDirectory, indexLoadingConfig);
+    IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider();
+    try {
+      rangeIndexHandler.updateIndices(writer, indexCreatorProvider);
+      Assert.fail("Creating the range index on forward index and inverted index disabled column should fail");
+    } catch (IllegalStateException e) {
+      assertEquals(e.getMessage(), "Forward index disabled column "
+          + "DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX must have an inverted index to regenerate the "
+          + "forward index. Regeneration of the forward index is required to create new indexes as well. Please "
+          + "refresh or back-fill the forward index");
+    }
+
+    // Remove inverted index disabled column from range index list and add a raw column instead
+    indexLoadingConfig.getRangeIndexColumns().remove(DIM_SV_FORWARD_INDEX_DISABLED_INTEGER_WITHOUT_INV_IDX);
+    indexLoadingConfig.getRangeIndexColumns().add(DIM_RAW_SV_FORWARD_INDEX_DISABLED_INTEGER);
+    rangeIndexHandler = new RangeIndexHandler(segmentLocalFSDirectory, indexLoadingConfig);
+    indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider();
+    try {
+      rangeIndexHandler.updateIndices(writer, indexCreatorProvider);
+      Assert.fail("Creating the range index on forward index and inverted index disabled column should fail");
+    } catch (IllegalStateException e) {
+      assertEquals(e.getMessage(), "Forward index disabled column "
+          + "DIM_RAW_SV_FORWARD_INDEX_DISABLED_INTEGER must have a dictionary to regenerate the forward index. "
+          + "Regeneration of the forward index is required to create new indexes as well. Please refresh or back-fill "
+          + "the forward index");
+    }
+  }
+
   private void validateIndexesForForwardIndexDisabledColumns(String columnName)
       throws IOException {
     // Setup
@@ -1706,15 +2200,19 @@ public class ForwardIndexHandlerTest {
     SegmentDirectory.Writer writer = segmentLocalFSDirectory.createWriter();
     ColumnMetadata columnMetadata = existingSegmentMetadata.getColumnMetadataFor(columnName);
 
-    assertTrue(writer.hasIndexFor(columnName, ColumnIndexType.DICTIONARY));
+    if (columnMetadata.hasDictionary()) {
+      assertTrue(writer.hasIndexFor(columnName, ColumnIndexType.DICTIONARY));
+      Dictionary dictionary = LoaderUtils.getDictionary(writer, columnMetadata);
+      assertEquals(columnMetadata.getCardinality(), dictionary.length());
+    } else {
+      assertFalse(writer.hasIndexFor(columnName, ColumnIndexType.DICTIONARY));
+    }
+
     if (columnMetadata.isSorted()) {
       assertTrue(writer.hasIndexFor(columnName, ColumnIndexType.FORWARD_INDEX));
     } else {
       assertFalse(writer.hasIndexFor(columnName, ColumnIndexType.FORWARD_INDEX));
     }
-
-    Dictionary dictionary = LoaderUtils.getDictionary(writer, columnMetadata);
-    assertEquals(columnMetadata.getCardinality(), dictionary.length());
   }
 
   private void validateForwardIndex(String columnName, @Nullable FieldConfig.CompressionCodec expectedCompressionType)
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java
index 50b8489fbd..bfe1b095ee 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java
@@ -60,7 +60,6 @@ import org.testng.annotations.Test;
 import org.testng.collections.Lists;
 
 import static org.apache.pinot.segment.spi.V1Constants.Indexes.FST_INDEX_FILE_EXTENSION;
-import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
@@ -523,20 +522,18 @@ public class LoaderTest {
     indexSegment.destroy();
 
     // Test scenarios to create a column with no forward index but without enabling inverted index for it
+    // This should still work as the original constraint to enforce that a column has a dictionary + inverted index
+    // has been relaxed.
     try {
       constructSegmentWithForwardIndexDisabled(SegmentVersion.v3, false);
-      Assert.fail("Disabling forward index without enabling inverted index is not allowed!");
     } catch (IllegalStateException e) {
-      assertEquals(String.format("Cannot disable forward index for column %s without inverted index enabled",
-          NO_FORWARD_INDEX_COL_NAME), e.getMessage());
+      Assert.fail("Disabling forward index without enabling inverted index is allowed now");
     }
 
     try {
       constructSegmentWithForwardIndexDisabled(SegmentVersion.v1, false);
-      Assert.fail("Disabling forward index without enabling inverted index is not allowed!");
     } catch (IllegalStateException e) {
-      assertEquals(String.format("Cannot disable forward index for column %s without inverted index enabled",
-          NO_FORWARD_INDEX_COL_NAME), e.getMessage());
+      Assert.fail("Disabling forward index without enabling inverted index is allowed now");
     }
   }
 
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
index ea3f8f3579..fb175619b6 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
@@ -895,8 +895,6 @@ public class SegmentPreProcessorTest {
       } else if (forwardIndexDisabled) {
         if (segmentMetadata.getVersion() == SegmentVersion.v3 || isAutoGenerated) {
           assertFalse(reader.hasIndexFor(column, ColumnIndexType.FORWARD_INDEX));
-          assertTrue(reader.hasIndexFor(column, ColumnIndexType.INVERTED_INDEX));
-          assertTrue(reader.hasIndexFor(column, ColumnIndexType.DICTIONARY));
         } else {
           // Updating dictionary or forward index for existing columns not supported for v1 segments yet
           assertTrue(reader.hasIndexFor(column, ColumnIndexType.FORWARD_INDEX));
@@ -949,6 +947,16 @@ public class SegmentPreProcessorTest {
     }
   }
 
+  private void validateIndexExists(String column, ColumnIndexType indexType)
+      throws Exception {
+    try (SegmentDirectory segmentDirectory1 = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
+        .load(_indexDir.toURI(),
+            new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
+        SegmentDirectory.Reader reader = segmentDirectory1.createReader()) {
+      assertTrue(reader.hasIndexFor(column, indexType));
+    }
+  }
+
   @Test
   public void testV1CreateInvertedIndices()
       throws Exception {
@@ -1933,15 +1941,11 @@ public class SegmentPreProcessorTest {
     // should be null since column does not exist in the schema
     assertNull(columnMetadata);
 
-    try {
-      createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
-          _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, false, 1, null, false, DataType.STRING,
-          100000);
-      Assert.fail("Forward index cannot be disabled for dictionary disabled columns!");
-    } catch (IllegalStateException e) {
-      assertEquals(e.getMessage(), "Dictionary disabled column: newForwardIndexDisabledColumnSV cannot disable the "
-          + "forward index");
-    }
+    // Forward index is always going to be present for default SV columns with forward index disabled. This is because
+    // such default columns are going to be sorted and the forwardIndexDisabled flag is a no-op for sorted columns
+    // Even raw columns are created with dictionary and forward index
+    createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
+        _newColumnsSchemaWithForwardIndexDisabled, true, true, true, 4, true, 0, null, false, DataType.STRING, 100000);
 
     constructV1Segment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList(),
         Collections.emptyList());
@@ -1950,15 +1954,11 @@ public class SegmentPreProcessorTest {
     // should be null since column does not exist in the schema
     assertNull(columnMetadata);
 
-    try {
-      createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
-          _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, false, 1, null, false, DataType.STRING,
-          100000);
-      Assert.fail("Forward index cannot be disabled for dictionary disabled columns!");
-    } catch (IllegalStateException e) {
-      assertEquals(e.getMessage(), "Dictionary disabled column: newForwardIndexDisabledColumnSV cannot disable the "
-          + "forward index");
-    }
+    // Forward index is always going to be present for default SV columns with forward index disabled. This is because
+    // such default columns are going to be sorted and the forwardIndexDisabled flag is a no-op for sorted columns
+    // Even raw columns are created with dictionary and forward index
+    createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
+        _newColumnsSchemaWithForwardIndexDisabled, true, true, true, 4, true, 0, null, false, DataType.STRING, 100000);
 
     // Reset the no dictionary columns
     _indexLoadingConfig.setNoDictionaryColumns(existingNoDictionaryColumns);
@@ -2006,15 +2006,9 @@ public class SegmentPreProcessorTest {
     // should be null since column does not exist in the schema
     assertNull(columnMetadata);
 
-    try {
-      createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
-          _newColumnsSchemaWithForwardIndexDisabled, true, true, true, 4, true, 0, null, false, DataType.STRING,
-          100000);
-      Assert.fail("Forward index cannot be disabled without enabling the inverted index!");
-    } catch (IllegalStateException e) {
-      assertEquals(e.getMessage(), "Inverted index must be enabled for forward index disabled column: "
-          + "newForwardIndexDisabledColumnSV");
-    }
+    // Disabling inverted index should be fine for disabling the forward index
+    createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
+        _newColumnsSchemaWithForwardIndexDisabled, true, true, true, 4, true, 0, null, false, DataType.STRING, 100000);
 
     constructV1Segment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList(),
         Collections.emptyList());
@@ -2023,15 +2017,10 @@ public class SegmentPreProcessorTest {
     // should be null since column does not exist in the schema
     assertNull(columnMetadata);
 
-    try {
-      createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
-          _newColumnsSchemaWithForwardIndexDisabled, true, true, true, 4, true, 0, null, false, DataType.STRING,
-          100000);
-      Assert.fail("Should not be able to disable forward index without inverted index for column");
-    } catch (IllegalStateException e) {
-      assertEquals(e.getMessage(), "Inverted index must be enabled for forward index disabled column: "
-          + "newForwardIndexDisabledColumnSV");
-    }
+    // Disabling inverted index should be fine for disabling the forward index
+    createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
+        _newColumnsSchemaWithForwardIndexDisabled, true, true, true, 4, true, 0, null, false, DataType.STRING,
+        100000);
 
     _indexLoadingConfig.setForwardIndexDisabledColumns(new HashSet<>());
   }
@@ -2078,7 +2067,9 @@ public class SegmentPreProcessorTest {
 
     // Add the column to the no dictionary column list
     Set<String> existingNoDictionaryColumns = _indexLoadingConfig.getNoDictionaryColumns();
-    _indexLoadingConfig.setNoDictionaryColumns(forwardIndexDisabledColumns);
+    _indexLoadingConfig.getNoDictionaryColumns().addAll(forwardIndexDisabledColumns);
+    // Disable the inverted index and validate that disabling the forward index goes through
+    _indexLoadingConfig.getInvertedIndexColumns().remove(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV);
 
     // Create a segment in V3, add a new column with no forward index enabled
     constructV3Segment();
@@ -2087,15 +2078,14 @@ public class SegmentPreProcessorTest {
     // should be null since column does not exist in the schema
     assertNull(columnMetadata);
 
-    try {
-      createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
-          _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, false, 1, null, false, DataType.STRING,
-          100000);
-      Assert.fail("Should not be able to disable forward index for raw column");
-    } catch (IllegalStateException e) {
-      assertEquals(e.getMessage(), "Dictionary disabled column: newForwardIndexDisabledColumnMV cannot disable the "
-          + "forward index");
-    }
+    createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
+        _newColumnsSchemaWithForwardIndexDisabled, true, false, false, 0, false, 1, null, true, DataType.STRING,
+        100000);
+    validateIndexDoesNotExist(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, ColumnIndexType.INVERTED_INDEX);
+    validateIndexDoesNotExist(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, ColumnIndexType.DICTIONARY);
+
+    // Re-enable the inverted index
+    _indexLoadingConfig.getInvertedIndexColumns().add(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV);
 
     constructV1Segment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList(),
         Collections.emptyList());
@@ -2104,18 +2094,18 @@ public class SegmentPreProcessorTest {
     // should be null since column does not exist in the schema
     assertNull(columnMetadata);
 
-    try {
-      createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
-          _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, false, 1, null, false, DataType.STRING,
-          100000);
-      Assert.fail("Should not be able to disable forward index for raw column");
-    } catch (IllegalStateException e) {
-      assertEquals(e.getMessage(), "Dictionary disabled column: newForwardIndexDisabledColumnMV cannot disable the "
-          + "forward index");
-    }
+    // For V1 segments reload doesn't support modifying the forward index yet. Since for MV columns we create a
+    // dictionary and inverted index in the default column handler, the forward index should indeed be disabled
+    // but we should still have the dictionary and inverted index.
+    createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
+        _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, false, 1, null, true, DataType.STRING,
+        100000);
+    validateIndexExists(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, ColumnIndexType.INVERTED_INDEX);
+    validateIndexExists(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, ColumnIndexType.DICTIONARY);
 
     // Reset the no dictionary columns
     _indexLoadingConfig.setNoDictionaryColumns(existingNoDictionaryColumns);
+    _indexLoadingConfig.getNoDictionaryColumns().removeAll(forwardIndexDisabledColumns);
 
     // Remove the column from the inverted index column list and validate that it fails
     invertedIndexColumns.removeAll(forwardIndexDisabledColumns);
@@ -2128,15 +2118,11 @@ public class SegmentPreProcessorTest {
     // should be null since column does not exist in the schema
     assertNull(columnMetadata);
 
-    try {
-      createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
-          _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, false, 1, null, false, DataType.STRING,
-          100000);
-      Assert.fail("Should not be able to disable forward index for raw column");
-    } catch (IllegalStateException e) {
-      assertEquals(e.getMessage(), "Inverted index must be enabled for forward index disabled column: "
-          + "newForwardIndexDisabledColumnMV");
-    }
+    createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
+        _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, false, 1, null, true, DataType.STRING,
+        100000);
+    validateIndexDoesNotExist(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, ColumnIndexType.INVERTED_INDEX);
+    validateIndexExists(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, ColumnIndexType.DICTIONARY);
 
     constructV1Segment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList(),
         Collections.emptyList());
@@ -2145,15 +2131,11 @@ public class SegmentPreProcessorTest {
     // should be null since column does not exist in the schema
     assertNull(columnMetadata);
 
-    try {
-      createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
-          _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, false, 1, null, false, DataType.STRING,
-          100000);
-      Assert.fail("Should not be able to disable forward index for raw column");
-    } catch (IllegalStateException e) {
-      assertEquals(e.getMessage(), "Inverted index must be enabled for forward index disabled column: "
-          + "newForwardIndexDisabledColumnMV");
-    }
+    createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
+        _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, false, 1, null, true, DataType.STRING,
+        100000);
+    validateIndexDoesNotExist(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, ColumnIndexType.INVERTED_INDEX);
+    validateIndexExists(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, ColumnIndexType.DICTIONARY);
 
     _indexLoadingConfig.setForwardIndexDisabledColumns(new HashSet<>());
   }
@@ -2189,19 +2171,19 @@ public class SegmentPreProcessorTest {
 
     // Add the column to the noDictionaryColumns list
     _indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_STRING_COL_DICT);
+    // Also remove the column from the inverted index list and try again
+    _indexLoadingConfig.getInvertedIndexColumns().remove(EXISTING_STRING_COL_DICT);
 
     constructV3Segment();
     segmentMetadata = new SegmentMetadataImpl(_indexDir);
     columnMetadata = segmentMetadata.getColumnMetadataFor(EXISTING_STRING_COL_DICT);
     assertNotNull(columnMetadata);
 
-    try {
-      createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_STRING_COL_DICT, 9, 4,
-          _newColumnsSchemaWithForwardIndexDisabled, false, true, false, 26, true, 0, null, true, DataType.STRING,
-          100000);
-    } catch (IllegalStateException e) {
-      assertEquals(e.getMessage(), "Must enable dictionary to disable the forward index for column: column5");
-    }
+    createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_STRING_COL_DICT, 9, 4,
+        _newColumnsSchemaWithForwardIndexDisabled, false, false, false, 0, true, 0, null, true, DataType.STRING,
+        100000);
+    validateIndexDoesNotExist(EXISTING_STRING_COL_DICT, ColumnIndexType.INVERTED_INDEX);
+    validateIndexDoesNotExist(EXISTING_STRING_COL_DICT, ColumnIndexType.DICTIONARY);
 
     // Reset the noDictionaryColumns list
     _indexLoadingConfig.getNoDictionaryColumns().remove(EXISTING_STRING_COL_DICT);
@@ -2214,13 +2196,11 @@ public class SegmentPreProcessorTest {
     columnMetadata = segmentMetadata.getColumnMetadataFor(EXISTING_STRING_COL_DICT);
     assertNotNull(columnMetadata);
 
-    try {
-      createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_STRING_COL_DICT, 9, 4,
-          _newColumnsSchemaWithForwardIndexDisabled, false, true, false, 26, true, 0, null, true, DataType.STRING,
-          100000);
-    } catch (IllegalStateException e) {
-      assertEquals(e.getMessage(), "Must enable inverted index to disable the forward index for column: column5");
-    }
+    createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_STRING_COL_DICT, 9, 4,
+        _newColumnsSchemaWithForwardIndexDisabled, false, true, false, 26, true, 0, null, true, DataType.STRING,
+        100000);
+    validateIndexDoesNotExist(EXISTING_STRING_COL_DICT, ColumnIndexType.INVERTED_INDEX);
+    validateIndexExists(EXISTING_STRING_COL_DICT, ColumnIndexType.DICTIONARY);
 
     // Reset the inverted index list
     _indexLoadingConfig.getInvertedIndexColumns().add(EXISTING_STRING_COL_DICT);
@@ -2243,9 +2223,13 @@ public class SegmentPreProcessorTest {
     createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_STRING_COL_DICT, 9, 4,
         _newColumnsSchemaWithForwardIndexDisabled, false, true, false, 26, true, 0, null, true, DataType.STRING,
         100000);
+    validateIndexExists(EXISTING_STRING_COL_DICT, ColumnIndexType.INVERTED_INDEX);
+    validateIndexExists(EXISTING_STRING_COL_DICT, ColumnIndexType.DICTIONARY);
     createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, COLUMN7_NAME, 359, 9,
         _newColumnsSchemaWithForwardIndexDisabled, false, true, false, 0, false, 24, null, true, DataType.INT,
         134090);
+    validateIndexExists(COLUMN7_NAME, ColumnIndexType.INVERTED_INDEX);
+    validateIndexExists(COLUMN7_NAME, ColumnIndexType.DICTIONARY);
 
     // Add another index on the column which has forward index disabled
     forwardIndexDisabledColumns = new HashSet<>();
@@ -2274,17 +2258,46 @@ public class SegmentPreProcessorTest {
     createAndValidateIndex(ColumnIndexType.FST_INDEX, EXISTING_STRING_COL_DICT, 9, 4,
         _newColumnsSchemaWithForwardIndexDisabled, false, true, false, 26, true, 0, null, true, DataType.STRING,
         100000);
+    validateIndexExists(EXISTING_STRING_COL_DICT, ColumnIndexType.INVERTED_INDEX);
+    validateIndexExists(EXISTING_STRING_COL_DICT, ColumnIndexType.DICTIONARY);
     createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, COLUMN1_NAME, 51594, 16,
         _newColumnsSchemaWithForwardIndexDisabled, false, true, false, 0, true, 0, null, true, DataType.INT,
         100000);
     createAndValidateIndex(ColumnIndexType.RANGE_INDEX, COLUMN1_NAME, 51594, 16,
         _newColumnsSchemaWithForwardIndexDisabled, false, true, false, 0, true, 0, null, true, DataType.INT,
         100000);
+    validateIndexExists(COLUMN1_NAME, ColumnIndexType.INVERTED_INDEX);
+    validateIndexExists(COLUMN1_NAME, ColumnIndexType.DICTIONARY);
 
-    // Reset indexLoadingConfig to remove the column from additional index lists
+    // Reset indexLoadingConfig to remove the EXISTING_STRING_COL_DICT column from additional index lists
     _indexLoadingConfig.getTextIndexColumns().remove(EXISTING_STRING_COL_DICT);
     _indexLoadingConfig.getFSTIndexColumns().remove(EXISTING_STRING_COL_DICT);
+
+    // Remove COLUMN1_NAME from the dictionary list and inverted index column list while leaving it on the range list
+    // Test reload fails since dictionary is being disabled for a forward index disabled column and range index has
+    // to be changed.
+    _indexLoadingConfig.getNoDictionaryColumns().add(COLUMN1_NAME);
+    _indexLoadingConfig.getInvertedIndexColumns().remove(COLUMN1_NAME);
+
+    constructV3Segment();
+    segmentMetadata = new SegmentMetadataImpl(_indexDir);
+    columnMetadata = segmentMetadata.getColumnMetadataFor(COLUMN1_NAME);
+    assertNotNull(columnMetadata);
+
+    try {
+      createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, COLUMN1_NAME, 51594, 16,
+          _newColumnsSchemaWithForwardIndexDisabled, false, false, false, 0, true, 0, null, true, DataType.INT,
+          100000);
+      Assert.fail("Should fail since we are disabling dictionary for forward index disabled column with range index");
+    } catch (IllegalStateException e) {
+      assertEquals(e.getMessage(), "Must disable range (enabled) index to disable the dictionary and forward "
+          + "index for column: column1 or refresh / back-fill the forward index");
+    }
+
+    // Reset indexLoadingConfig to remove the column from additional index lists
     _indexLoadingConfig.getRangeIndexColumns().remove(COLUMN1_NAME);
+    _indexLoadingConfig.getNoDictionaryColumns().remove(COLUMN1_NAME);
+    _indexLoadingConfig.getInvertedIndexColumns().add(COLUMN1_NAME);
   }
 
   /**
@@ -2331,13 +2344,11 @@ public class SegmentPreProcessorTest {
           _newColumnsSchemaWithForwardIndexDisabled, false, false, false, 0, true, 0, null, true, DataType.STRING,
           100000);
     } catch (IllegalStateException e) {
-      assertEquals(e.getMessage(), "Must enable dictionary to disable the forward index for column: column4");
+      assertEquals(e.getMessage(), "Must disable inverted index (enabled) and FST (disabled) index to disable "
+          + "the dictionary and forward index for column: column4");
     }
 
-    // Reset the noDictionaryColumns list
-    _indexLoadingConfig.getNoDictionaryColumns().remove(EXISTING_STRING_COL_RAW);
-
-    // Remove te column from the inverted index list
+    // Also remove the column from the inverted index list and validate that now it works
     _indexLoadingConfig.getInvertedIndexColumns().remove(EXISTING_STRING_COL_RAW);
 
     constructV3Segment();
@@ -2345,13 +2356,25 @@ public class SegmentPreProcessorTest {
     columnMetadata = segmentMetadata.getColumnMetadataFor(EXISTING_STRING_COL_RAW);
     assertNotNull(columnMetadata);
 
-    try {
-      createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_STRING_COL_RAW, 5, 3,
-          _newColumnsSchemaWithForwardIndexDisabled, false, false, false, 0, true, 0, null, true, DataType.STRING,
-          100000);
-    } catch (IllegalStateException e) {
-      assertEquals(e.getMessage(), "Must enable inverted index to disable the forward index for column: column4");
-    }
+    createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_STRING_COL_RAW, 5, 3,
+        _newColumnsSchemaWithForwardIndexDisabled, false, false, false, 0, true, 0, null, true, DataType.STRING,
+        100000);
+    validateIndexDoesNotExist(EXISTING_STRING_COL_RAW, ColumnIndexType.INVERTED_INDEX);
+    validateIndexDoesNotExist(EXISTING_STRING_COL_RAW, ColumnIndexType.DICTIONARY);
+
+    // Reset just the noDictionaryColumns list and keep the column off the inverted index list
+    _indexLoadingConfig.getNoDictionaryColumns().remove(EXISTING_STRING_COL_RAW);
+
+    constructV3Segment();
+    segmentMetadata = new SegmentMetadataImpl(_indexDir);
+    columnMetadata = segmentMetadata.getColumnMetadataFor(EXISTING_STRING_COL_RAW);
+    assertNotNull(columnMetadata);
+
+    createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_STRING_COL_RAW, 5, 3,
+        _newColumnsSchemaWithForwardIndexDisabled, false, true, false, 4, true, 0, null, true, DataType.STRING,
+        100000);
+    validateIndexDoesNotExist(EXISTING_STRING_COL_RAW, ColumnIndexType.INVERTED_INDEX);
+    validateIndexExists(EXISTING_STRING_COL_RAW, ColumnIndexType.DICTIONARY);
 
     // Reset the inverted index list
     _indexLoadingConfig.getInvertedIndexColumns().add(EXISTING_STRING_COL_RAW);
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index adf81f9b30..88a829d846 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -918,29 +918,30 @@ public class TableConfigUtilsTest {
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
         .setNoDictionaryColumns(Arrays.asList("myCol1")).build();
     try {
-      // Enable forward index disabled flag for a raw column
+      // Enable forward index disabled flag for a raw column. This should succeed as though the forward index cannot
+      // be rebuilt without a dictionary, the constraint to have a dictionary has been lifted.
       Map<String, String> fieldConfigProperties = new HashMap<>();
       fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
       FieldConfig fieldConfig = new FieldConfig("myCol1", FieldConfig.EncodingType.RAW, null, null, null, null,
           fieldConfigProperties);
       tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
       TableConfigUtils.validate(tableConfig, schema);
-      Assert.fail("Should fail for myCol1 without dictionary and with forward index disabled");
     } catch (Exception e) {
-      Assert.assertEquals(e.getMessage(), "Forward index disabled column myCol1 must have dictionary enabled");
+      Assert.fail("Validation should pass since forward index can be disabled for a column without a dictionary");
     }
 
     try {
-      // Enable forward index disabled flag for a column without inverted index
+      // Enable forward index disabled flag for a column without inverted index. This should succeed as though the
+      // forward index cannot be rebuilt without an inverted index, the constraint to have an inverted index has been
+      // lifted.
       Map<String, String> fieldConfigProperties = new HashMap<>();
       fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
       FieldConfig fieldConfig = new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY, null, null, null, null,
           fieldConfigProperties);
       tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
       TableConfigUtils.validate(tableConfig, schema);
-      Assert.fail("Should fail for conflicting myCol2 with forward index disabled but no inverted index");
     } catch (Exception e) {
-      Assert.assertEquals(e.getMessage(), "Forward index disabled column myCol2 must have inverted index enabled");
+      Assert.fail("Validation should pass since forward index can be disabled for a column without an inverted index");
     }
 
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
@@ -1008,6 +1009,51 @@ public class TableConfigUtilsTest {
           + "< 2. Cannot disable forward index for column myCol1. Either disable range index or create range index "
           + "with version >= 2 to use this feature");
     }
+
+    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        .setNoDictionaryColumns(Arrays.asList("myCol2")).setInvertedIndexColumns(Arrays.asList("myCol2")).build();
+    try {
+      // Enable forward index disabled flag for a column with inverted index and disable dictionary
+      Map<String, String> fieldConfigProperties = new HashMap<>();
+      fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
+      FieldConfig fieldConfig = new FieldConfig("myCol2", FieldConfig.EncodingType.RAW,
+          FieldConfig.IndexType.INVERTED, null, null, null, fieldConfigProperties);
+      tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
+      TableConfigUtils.validate(tableConfig, schema);
+      Assert.fail("Should not be able to disable dictionary but keep inverted index");
+    } catch (Exception e) {
+      Assert.assertEquals(e.getMessage(), "Cannot create an Inverted index on column myCol2 specified in the "
+          + "noDictionaryColumns config");
+    }
+
+    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        .setNoDictionaryColumns(Arrays.asList("myCol2")).build();
+    try {
+      // Enable forward index disabled flag for a column with FST index and disable dictionary
+      Map<String, String> fieldConfigProperties = new HashMap<>();
+      fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
+      FieldConfig fieldConfig = new FieldConfig("myCol2", FieldConfig.EncodingType.RAW,
+          FieldConfig.IndexType.FST, null, null, null, fieldConfigProperties);
+      tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
+      TableConfigUtils.validate(tableConfig, schema);
+      Assert.fail("Should not be able to disable dictionary but keep inverted index");
+    } catch (Exception e) {
+      Assert.assertEquals(e.getMessage(), "FST Index is only enabled on dictionary encoded columns");
+    }
+
+    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        .setNoDictionaryColumns(Arrays.asList("intCol")).setRangeIndexColumns(Arrays.asList("intCol")).build();
+    try {
+      // Enable forward index disabled flag for a column with FST index and disable dictionary
+      Map<String, String> fieldConfigProperties = new HashMap<>();
+      fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
+      FieldConfig fieldConfig = new FieldConfig("intCol", FieldConfig.EncodingType.RAW,
+          FieldConfig.IndexType.RANGE, null, null, null, fieldConfigProperties);
+      tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
+      TableConfigUtils.validate(tableConfig, schema);
+    } catch (Exception e) {
+      Assert.fail("Range index with forward index disabled no dictionary column is allowed");
+    }
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org