You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2022/10/12 05:24:16 UTC

[pinot] branch master updated: Add an option to disable the creation of the forward index for a column (#9333)

This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e813867985 Add an option to disable the creation of the forward index for a column (#9333)
e813867985 is described below

commit e813867985746e916c8e898a530002551b661496
Author: Sonam Mandal <so...@linkedin.com>
AuthorDate: Tue Oct 11 22:24:11 2022 -0700

    Add an option to disable the creation of the forward index for a column (#9333)
    
    * Add the option to create columns without a forward index
    
    * Fix unit test
    
    * Set and unset flag in test
    
    * Empty commit to trigger tests
    
    * Empty commit to trigger tests
    
    * Trying test changes to see if unit tests pass
    
    * Add order by to tests for better determinism
    
    * Change disk size used in OfflineClusterIntegrationTest:testAggregateMetadataAPI
    
    * Empty commit to trigger tests
    
    * Address review comments
    
    * Empty commit to trigger tests
    
    * Address review comments, add file for MV forward index disabled tests
    
    * Enhance tests and add fixes and address some comments
    
    * Address review comments, remove no-op reader, allow sorted column, remove metadata
    
    * Address review comments
    
    * Address review comments
    
    * Empty-Commit
    
    * Address review comments, remove default column handling for forwardIndexDisabled columns
    
    * Fix formatting
    
    * Fix formatting
    
    * Update tests to check error message
    
    * Empty commit to trigger tests
    
    * Empty commit to trigger tests
    
    * Add checks for forwardIndexDisabled for derived column handling
    
    * Empty commit to trigger tests
    
    * Address review comments
    
    * Fix tests
---
 .../org/apache/pinot/core/common/DataFetcher.java  |    8 +-
 .../pinot/core/minion/RawIndexConverter.java       |   19 +-
 .../operator/filter/ScanBasedFilterOperator.java   |    4 +
 .../ForwardIndexDisabledMultiValueQueriesTest.java |  885 ++++++++++++++
 ...ForwardIndexDisabledSingleValueQueriesTest.java | 1217 ++++++++++++++++++++
 .../converter/stats/MutableColumnStatistics.java   |    3 +
 .../stats/MutableNoDictionaryColStatistics.java    |    3 +
 .../creator/impl/DefaultIndexCreatorProvider.java  |   28 +-
 .../creator/impl/SegmentColumnarIndexCreator.java  |   52 +
 .../creator/impl/fwd/NoOpForwardIndexCreator.java  |   64 +
 .../index/column/PhysicalColumnIndexContainer.java |   17 +-
 .../segment/index/datasource/BaseDataSource.java   |    2 +-
 .../segment/index/loader/IndexLoadingConfig.java   |   53 +-
 .../defaultcolumn/BaseDefaultColumnHandler.java    |   78 ++
 .../forward/FixedBitMVForwardIndexReader.java      |    1 +
 .../segment/readers/PinotSegmentColumnReader.java  |    1 +
 .../startree/v2/store/StarTreeLoaderUtils.java     |    1 +
 .../segment/local/utils/TableConfigUtils.java      |   44 +
 .../local/segment/index/loader/LoaderTest.java     |  159 +++
 .../index/loader/SegmentPreProcessorTest.java      |  353 +++++-
 .../BaseDefaultColumnHandlerTest.java              |    2 +-
 .../segment/local/utils/TableConfigUtilsTest.java  |   94 ++
 .../newColumnsSchemaWithForwardIndexDisabled.json  |   77 ++
 .../segment/spi/creator/IndexCreationContext.java  |   25 +-
 .../spi/creator/SegmentGeneratorConfig.java        |   44 +
 .../apache/pinot/spi/config/table/FieldConfig.java |    3 +
 .../converter/DictionaryToRawIndexConverter.java   |   19 +-
 27 files changed, 3210 insertions(+), 46 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java b/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
index f04e085a9e..4e2d13a72c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.common;
 
+import com.google.common.base.Preconditions;
 import java.io.Closeable;
 import java.io.IOException;
 import java.math.BigDecimal;
@@ -66,10 +67,13 @@ public class DataFetcher {
     for (Map.Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
       String column = entry.getKey();
       DataSource dataSource = entry.getValue();
+      DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
+      ForwardIndexReader<?> forwardIndexReader = dataSource.getForwardIndex();
+      Preconditions.checkState(forwardIndexReader != null,
+          "Forward index disabled for column: %s, cannot create DataFetcher!", column);
       ColumnValueReader columnValueReader =
-          new ColumnValueReader(dataSource.getForwardIndex(), dataSource.getDictionary());
+          new ColumnValueReader(forwardIndexReader, dataSource.getDictionary());
       _columnValueReaderMap.put(column, columnValueReader);
-      DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
       if (!dataSourceMetadata.isSingleValue()) {
         maxNumValuesPerMVEntry = Math.max(maxNumValuesPerMVEntry, dataSourceMetadata.getMaxNumValuesPerMVEntry());
       }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
index f6d2a7d2a2..49a786c340 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/RawIndexConverter.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.minion;
 
+import com.google.common.base.Preconditions;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
@@ -203,7 +204,9 @@ public class RawIndexConverter {
 
     // Create the raw index
     DataSource dataSource = _originalImmutableSegment.getDataSource(columnName);
-    ForwardIndexReader reader = dataSource.getForwardIndex();
+    ForwardIndexReader forwardIndexReader = dataSource.getForwardIndex();
+    Preconditions.checkState(forwardIndexReader != null,
+        "Forward index disabled for column: %s, raw index conversion operation unsupported!", columnName);
     Dictionary dictionary = dataSource.getDictionary();
     assert dictionary != null;
     DataType storedType = dictionary.getValueType();
@@ -213,36 +216,36 @@ public class RawIndexConverter {
         IndexCreationContext.builder().withIndexDir(_convertedIndexDir).withColumnMetadata(columnMetadata)
             .withFieldSpec(new DimensionFieldSpec(columnName, storedType, columnMetadata.isSingleValue()))
             .withDictionary(false).build().forForwardIndex(ChunkCompressionType.LZ4, null));
-        ForwardIndexReaderContext readerContext = reader.createContext()) {
+        ForwardIndexReaderContext readerContext = forwardIndexReader.createContext()) {
       switch (storedType) {
         case INT:
           for (int docId = 0; docId < numDocs; docId++) {
-            rawIndexCreator.putInt(dictionary.getIntValue(reader.getDictId(docId, readerContext)));
+            rawIndexCreator.putInt(dictionary.getIntValue(forwardIndexReader.getDictId(docId, readerContext)));
           }
           break;
         case LONG:
           for (int docId = 0; docId < numDocs; docId++) {
-            rawIndexCreator.putLong(dictionary.getLongValue(reader.getDictId(docId, readerContext)));
+            rawIndexCreator.putLong(dictionary.getLongValue(forwardIndexReader.getDictId(docId, readerContext)));
           }
           break;
         case FLOAT:
           for (int docId = 0; docId < numDocs; docId++) {
-            rawIndexCreator.putFloat(dictionary.getFloatValue(reader.getDictId(docId, readerContext)));
+            rawIndexCreator.putFloat(dictionary.getFloatValue(forwardIndexReader.getDictId(docId, readerContext)));
           }
           break;
         case DOUBLE:
           for (int docId = 0; docId < numDocs; docId++) {
-            rawIndexCreator.putDouble(dictionary.getDoubleValue(reader.getDictId(docId, readerContext)));
+            rawIndexCreator.putDouble(dictionary.getDoubleValue(forwardIndexReader.getDictId(docId, readerContext)));
           }
           break;
         case STRING:
           for (int docId = 0; docId < numDocs; docId++) {
-            rawIndexCreator.putString(dictionary.getStringValue(reader.getDictId(docId, readerContext)));
+            rawIndexCreator.putString(dictionary.getStringValue(forwardIndexReader.getDictId(docId, readerContext)));
           }
           break;
         case BYTES:
           for (int docId = 0; docId < numDocs; docId++) {
-            rawIndexCreator.putBytes(dictionary.getBytesValue(reader.getDictId(docId, readerContext)));
+            rawIndexCreator.putBytes(dictionary.getBytesValue(forwardIndexReader.getDictId(docId, readerContext)));
           }
           break;
         default:
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ScanBasedFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ScanBasedFilterOperator.java
index 126431c9ce..89305d24fb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ScanBasedFilterOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ScanBasedFilterOperator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.operator.filter;
 
+import com.google.common.base.Preconditions;
 import java.util.Collections;
 import java.util.List;
 import org.apache.pinot.core.common.Operator;
@@ -43,6 +44,9 @@ public class ScanBasedFilterOperator extends BaseFilterOperator {
     _dataSource = dataSource;
     _numDocs = numDocs;
     _nullHandlingEnabled = nullHandlingEnabled;
+    Preconditions.checkState(_dataSource.getForwardIndex() != null,
+        "Forward index disabled for column: %s, scan based filtering not supported!",
+        _dataSource.getDataSourceMetadata().getFieldSpec().getName());
   }
 
   @Override
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledMultiValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledMultiValueQueriesTest.java
new file mode 100644
index 0000000000..a1c0418b33
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledMultiValueQueriesTest.java
@@ -0,0 +1,885 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.TimeGranularitySpec;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * The <code>ForwardIndexDisabledMultiValueQueriesTest</code> class sets up the index segment for the no forward
+ * index multi-value queries test.
+ * <p>There are totally 14 columns, 100000 records inside the original Avro file where 10 columns are selected to build
+ * the index segment. Selected columns information are as following:
+ * <ul>
+ *   ColumnName, FieldType, DataType, Cardinality, IsSorted, HasInvertedIndex, IsMultiValue, FwdIndexDisabled: S1, S2
+ *   <li>column1, METRIC, INT, 51594, F, F, F, F, F</li>
+ *   <li>column2, METRIC, INT, 42242, F, F, F, F, F</li>
+ *   <li>column3, DIMENSION, STRING, 5, F, T, F, F, F</li>
+ *   <li>column5, DIMENSION, STRING, 9, F, F, F, F, F</li>
+ *   <li>column6, DIMENSION, INT, 18499, F, T, T, T, T</li>
+ *   <li>column7, DIMENSION, INT, 359, F, T, T, T, F</li>
+ *   <li>column8, DIMENSION, INT, 850, F, T, F, F, F</li>
+ *   <li>column9, METRIC, INT, 146, F, T, F, F, F</li>
+ *   <li>column10, METRIC, INT, 3960, F, F, F, F, F</li>
+ *   <li>daysSinceEpoch, TIME, INT, 1, T, F, F, F, F</li>
+ * </ul>
+ */
+public class ForwardIndexDisabledMultiValueQueriesTest extends BaseQueriesTest {
+  private static final String AVRO_DATA = "data" + File.separator + "test_data-mv.avro";
+  private static final String SEGMENT_NAME_1 = "testTable_1756015688_1756015688";
+  private static final String SEGMENT_NAME_2 = "testTable_1756015689_1756015689";
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
+      "ForwardIndexDisabledMultiValueQueriesTest");
+
+  private static final String SELECT_STAR_QUERY = "SELECT * FROM testTable";
+
+  // Hard-coded query filter.
+  protected static final String FILTER = " WHERE column1 > 100000000"
+      + " AND column2 BETWEEN 20000000 AND 1000000000"
+      + " AND column3 <> 'w'"
+      + " AND (column6 < 500000 OR column7 NOT IN (225, 407))"
+      + " AND daysSinceEpoch = 1756015683";
+
+  private IndexSegment _indexSegment;
+  // Contains 2 identical index segments.
+  private List<IndexSegment> _indexSegments;
+
+  private TableConfig _tableConfig;
+  private List<String> _invertedIndexColumns;
+  private List<String> _forwardIndexDisabledColumns;
+
+  @BeforeClass
+  public void buildSegment()
+      throws Exception {
+    FileUtils.deleteQuietly(INDEX_DIR);
+
+    // Get resource file path.
+    URL resource = getClass().getClassLoader().getResource(AVRO_DATA);
+    assertNotNull(resource);
+    String filePath = resource.getFile();
+
+    // Build the segment schema.
+    Schema schema = new Schema.SchemaBuilder().setSchemaName("testTable").addMetric("column1", FieldSpec.DataType.INT)
+        .addMetric("column2", FieldSpec.DataType.INT).addSingleValueDimension("column3", FieldSpec.DataType.STRING)
+        .addSingleValueDimension("column5", FieldSpec.DataType.STRING)
+        .addMultiValueDimension("column6", FieldSpec.DataType.INT)
+        .addMultiValueDimension("column7", FieldSpec.DataType.INT)
+        .addSingleValueDimension("column8", FieldSpec.DataType.INT).addMetric("column9", FieldSpec.DataType.INT)
+        .addMetric("column10", FieldSpec.DataType.INT)
+        .addTime(new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, "daysSinceEpoch"), null).build();
+
+    createSegment(filePath, SEGMENT_NAME_1, schema);
+    createSegment(filePath, SEGMENT_NAME_2, schema);
+
+    ImmutableSegment immutableSegment1 = loadSegmentWithMetadataChecks(SEGMENT_NAME_1);
+    ImmutableSegment immutableSegment2 = loadSegmentWithMetadataChecks(SEGMENT_NAME_2);
+
+    _indexSegment = immutableSegment1;
+    _indexSegments = Arrays.asList(immutableSegment1, immutableSegment2);
+  }
+
+  private void createSegment(String filePath, String segmentName, Schema schema)
+      throws Exception {
+    // Create field configs for the no forward index columns
+    List<FieldConfig> fieldConfigList = new ArrayList<>();
+    fieldConfigList.add(new FieldConfig("column6", FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(), null,
+        Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString())));
+    if (segmentName.equals(SEGMENT_NAME_1)) {
+      fieldConfigList.add(new FieldConfig("column7", FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(),
+          null, Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString())));
+
+      // Build table config based on segment 1 as it contains both columns under no forward index
+      _tableConfig = new TableConfigBuilder(TableType.OFFLINE).setNoDictionaryColumns(Arrays.asList("column5"))
+          .setTableName("testTable").setTimeColumnName("daysSinceEpoch").setFieldConfigList(fieldConfigList).build();
+    }
+
+    // Create the segment generator config.
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(_tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(filePath);
+    segmentGeneratorConfig.setTableName("testTable");
+    segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
+    segmentGeneratorConfig.setSegmentName(segmentName);
+    _invertedIndexColumns = Arrays.asList("column3", "column6", "column7", "column8", "column9");
+    segmentGeneratorConfig.setInvertedIndexCreationColumns(_invertedIndexColumns);
+    _forwardIndexDisabledColumns = new ArrayList<>(Arrays.asList("column6", "column7"));
+    segmentGeneratorConfig.setForwardIndexDisabledColumns(_forwardIndexDisabledColumns);
+    // The segment generation code in SegmentColumnarIndexCreator will throw
+    // exception if start and end time in time column are not in acceptable
+    // range. For this test, we first need to fix the input avro data
+    // to have the time column values in allowed range. Until then, the check
+    // is explicitly disabled
+    segmentGeneratorConfig.setSkipTimeValueCheck(true);
+
+    // Build the index segment.
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+  }
+
+  private ImmutableSegment loadSegmentWithMetadataChecks(String segmentName)
+      throws Exception {
+    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
+    indexLoadingConfig.setTableConfig(_tableConfig);
+    indexLoadingConfig.setInvertedIndexColumns(new HashSet<>(_invertedIndexColumns));
+    indexLoadingConfig.setForwardIndexDisabledColumns(new HashSet<>(_forwardIndexDisabledColumns));
+    indexLoadingConfig.setReadMode(ReadMode.heap);
+
+    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName),
+        indexLoadingConfig);
+
+    Map<String, ColumnMetadata> columnMetadataMap1 = immutableSegment.getSegmentMetadata().getColumnMetadataMap();
+    columnMetadataMap1.forEach((column, metadata) -> {
+      if (column.equals("column6") || column.equals("column7")) {
+        assertTrue(metadata.hasDictionary());
+        assertFalse(metadata.isSingleValue());
+        assertNull(immutableSegment.getForwardIndex(column));
+      } else {
+        assertNotNull(immutableSegment.getForwardIndex(column));
+      }
+    });
+
+    return immutableSegment;
+  }
+
+  @AfterClass
+  public void deleteAndDestroySegment() {
+    FileUtils.deleteQuietly(INDEX_DIR);
+    _indexSegments.forEach((IndexSegment::destroy));
+  }
+
+  @Override
+  protected String getFilter() {
+    return FILTER;
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
+
+  @Test
+  public void testSelectStarQueries() {
+    // Select * without any filters
+    try {
+      getBrokerResponse(SELECT_STAR_QUERY);
+      Assert.fail("Select * query should fail since forwardIndexDisabled on a select column");
+    } catch (IllegalStateException e) {
+      assertTrue(e.getMessage().contains("Forward index disabled for column:")
+          && e.getMessage().contains("cannot create DataFetcher!"));
+    }
+
+    // Select * with filters
+    try {
+      getBrokerResponse(SELECT_STAR_QUERY + FILTER);
+      Assert.fail("Select * query should fail since forwardIndexDisabled on a select column");
+    } catch (IllegalStateException e) {
+      assertTrue(e.getMessage().contains("Forward index disabled for column:")
+          && e.getMessage().contains("scan based filtering not supported!"));
+    }
+  }
+
+  @Test
+  public void testSelectQueries() {
+    {
+      // Selection query without filters including a column with forwardIndexDisabled enabled on both segments
+      String query = "SELECT column1, column5, column6, column9, column10 FROM testTable";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a select column");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Selection query without filters including a column with forwardIndexDisabled enabled on one segment
+      String query = "SELECT column1, column5, column7, column9, column10 FROM testTable";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a select column");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Selection query with filters including a column with forwardIndexDisabled enabled on both segments
+      String query = "SELECT column1, column5, column6, column9, column10 FROM testTable WHERE column6 = 1001";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a select column");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Selection query with filters including a column with forwardIndexDisabled enabled on one segment
+      String query = "SELECT column1, column5, column7, column9, column10 FROM testTable WHERE column7 = 2147483647";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a select column");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Selection query without filters and without columns with forwardIndexDisabled enabled on either segment
+      String query = "SELECT column1, column5, column9, column10 FROM testTable ORDER BY column1";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+      assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 400_000L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 400_120L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      DataSchema dataSchema = new DataSchema(new String[]{"column1", "column5", "column9", "column10"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
+              DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT});
+      assertEquals(resultTable.getDataSchema(), dataSchema);
+      List<Object[]> resultRows = resultTable.getRows();
+      int previousColumn1 = Integer.MIN_VALUE;
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 4);
+        // Column 1
+        assertTrue((Integer) resultRow[0] >= previousColumn1);
+        previousColumn1 = (Integer) resultRow[0];
+      }
+
+      Object[] firstRow = resultRows.get(0);
+      // Column 5
+      assertEquals((String) firstRow[1], "AKXcXcIqsqOJFsdwxZ");
+    }
+    {
+      // Transform function on a selection clause with a forwardIndexDisabled column in transform
+      String query = "SELECT ARRAYLENGTH(column6) from testTable";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a select column in transform");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Selection query with filters (not including forwardIndexDisabled column) and without columns with
+      // forwardIndexDisabled enabled on either segment
+      String query = "SELECT column1, column5, column9, column10 FROM testTable WHERE column1 > 100000000"
+          + " AND column2 BETWEEN 20000000 AND 1000000000"
+          + " AND column3 <> 'w'"
+          + " AND daysSinceEpoch = 1756015683 ORDER BY column1";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+      assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 62_700L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 62_820);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 304_120L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      DataSchema dataSchema = new DataSchema(new String[]{"column1", "column5", "column9", "column10"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
+              DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT});
+      assertEquals(resultTable.getDataSchema(), dataSchema);
+      List<Object[]> resultRows = resultTable.getRows();
+      int previousColumn1 = Integer.MIN_VALUE;
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 4);
+        assertTrue((Integer) resultRow[0] >= previousColumn1);
+        previousColumn1 = (Integer) resultRow[0];
+      }
+
+      Object[] firstRow = resultRows.get(0);
+      // Column 5
+      assertEquals((String) firstRow[1], "AKXcXcIqsqOJFsdwxZ");
+    }
+    {
+      // Selection query with supported filters (including forwardIndexDisabled column) and without columns with
+      // forwardIndexDisabled enabled on either segment
+      String query = "SELECT column1, column5, column9, column10 FROM testTable WHERE column6 = 1001 "
+          + "ORDER BY column1";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 8);
+      assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 8L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 32L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      DataSchema dataSchema = new DataSchema(new String[]{"column1", "column5", "column9", "column10"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
+              DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT});
+      assertEquals(resultTable.getDataSchema(), dataSchema);
+      List<Object[]> resultRows = resultTable.getRows();
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 4);
+        assertEquals(resultRow[0], 1849044734);
+        assertEquals((String) resultRow[1], "AKXcXcIqsqOJFsdwxZ");
+        assertEquals(resultRow[2], 674022574);
+        assertEquals(resultRow[3], 674022574);
+      }
+    }
+    {
+      // Selection query with supported filters (including forwardIndexDisabled column) and without columns with
+      // forwardIndexDisabled enabled on either segment
+      String query = "SELECT column1, column5, column9, column10 FROM testTable WHERE column7 != 201 "
+          + "ORDER BY column1";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+      assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 399_896L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 400_016L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      DataSchema dataSchema = new DataSchema(new String[]{"column1", "column5", "column9", "column10"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
+              DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT});
+      assertEquals(resultTable.getDataSchema(), dataSchema);
+      List<Object[]> resultRows = resultTable.getRows();
+      int previousColumn1 = Integer.MIN_VALUE;
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 4);
+        assertTrue((Integer) resultRow[0] >= previousColumn1);
+        previousColumn1 = (Integer) resultRow[0];
+      }
+
+      Object[] firstRow = resultRows.get(0);
+      // Column 5
+      assertEquals((String) firstRow[1], "AKXcXcIqsqOJFsdwxZ");
+    }
+    {
+      // Selection query with supported filters (including forwardIndexDisabled column) and without columns with
+      // forwardIndexDisabled enabled on either segment
+      String query = "SELECT column1, column5, column9, column10 FROM testTable WHERE column7 IN (201, 2147483647) "
+          + "ORDER BY column1";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+      assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 199_860L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 199_980L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      DataSchema dataSchema = new DataSchema(new String[]{"column1", "column5", "column9", "column10"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
+              DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT});
+      assertEquals(resultTable.getDataSchema(), dataSchema);
+      List<Object[]> resultRows = resultTable.getRows();
+      int previousColumn1 = Integer.MIN_VALUE;
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 4);
+        assertTrue((Integer) resultRow[0] >= previousColumn1);
+        previousColumn1 = (Integer) resultRow[0];
+      }
+
+      Object[] firstRow = resultRows.get(0);
+      // Column 5
+      assertEquals((String) firstRow[1], "AKXcXcIqsqOJFsdwxZ");
+    }
+    {
+      // Selection query with supported filters (including forwardIndexDisabled column) and without columns with
+      // forwardIndexDisabled enabled on either segment
+      String query = "SELECT column1, column5, column9, column10 FROM testTable WHERE column6 NOT IN "
+          + "(1001, 2147483647) ORDER BY column1";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+      assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 174_552L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 174_672L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      DataSchema dataSchema = new DataSchema(new String[]{"column1", "column5", "column9", "column10"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
+              DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT});
+      assertEquals(resultTable.getDataSchema(), dataSchema);
+      List<Object[]> resultRows = resultTable.getRows();
+      int previousColumn1 = Integer.MIN_VALUE;
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 4);
+        assertTrue((Integer) resultRow[0] >= previousColumn1);
+        previousColumn1 = (Integer) resultRow[0];
+      }
+
+      Object[] firstRow = resultRows.get(0);
+      // Column 5
+      assertEquals((String) firstRow[1], "EOFxevm");
+    }
+    {
+      // Query with literal only in SELECT
+      String query = "SELECT 'marvin' from testTable ORDER BY column1";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+      assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 400_000L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 400_000L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"'marvin'"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING}));
+      List<Object[]> resultRows = resultTable.getRows();
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 1);
+        assertEquals((String) resultRow[0], "marvin");
+      }
+    }
+    {
+      // Selection query with '<' filter on a forwardIndexDisabled column without range index available
+      String query = "SELECT column1, column5, column9, column10 FROM testTable WHERE column6 < 2147483647 AND "
+          + "column6 >= 1001 ORDER BY column1";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a range query column without range index");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("scan based filtering not supported!"));
+      }
+    }
+    {
+      // Selection query with '>=' filter on a forwardIndexDisabled column without range index available
+      String query = "SELECT column1, column5, column9, column10 FROM testTable WHERE column7 > 201";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a range query column without range index");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("scan based filtering not supported!"));
+      }
+    }
+    {
+      // Select query with a filter on a column which doesn't have forwardIndexDisabled enabled
+      String query = "SELECT column1, column5, column9 from testTable WHERE column9 < 3890167 ORDER BY column1";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+      assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 48L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 128L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 400_000L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"column1", "column5", "column9"},
+          new DataSchema.ColumnDataType[]{
+              DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT
+          }));
+      List<Object[]> resultRows = resultTable.getRows();
+      int previousColumn1Value = Integer.MIN_VALUE;
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 3);
+        assertTrue((Integer) resultRow[0] >= previousColumn1Value);
+        previousColumn1Value = (Integer) resultRow[0];
+        assertEquals(resultRow[2], 3890166);
+      }
+    }
+    {
+      // Transform function on a filter clause for forwardIndexDisabled column in transform
+      String query = "SELECT column1, column10 from testTable WHERE ARRAYLENGTH(column6) = 2";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() != null
+          && brokerResponseNative.getProcessingExceptions().size() > 0);
+    }
+  }
+
+  @Test
+  public void testSelectWithDistinctQueries() {
+    // Select a mix of forwardIndexDisabled and non-forwardIndexDisabled columns with distinct
+    String query = "SELECT DISTINCT column1, column6, column9 FROM testTable LIMIT 10";
+    try {
+      getBrokerResponse(query);
+      Assert.fail("Query should fail since forwardIndexDisabled on a column in select distinct");
+    } catch (IllegalStateException e) {
+      assertTrue(e.getMessage().contains("Forward index disabled for column:")
+          && e.getMessage().contains("cannot create DataFetcher!"));
+    }
+  }
+
+  @Test
+  public void testSelectWithGroupByOrderByQueries() {
+    {
+      // Select a mix of forwardIndexDisabled and non-forwardIndexDisabled columns with group by order by
+      String query = "SELECT column1, column6 FROM testTable GROUP BY column1, column6 ORDER BY column1, column6 "
+          + " LIMIT 10";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a column in select group by order by");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Select forwardIndexDisabled columns with group by order by
+      String query = "SELECT column7, column6 FROM testTable GROUP BY column7, column6 ORDER BY column7, column6 "
+          + " LIMIT 10";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a column in select group by order by");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Select non-forwardIndexDisabled columns with group by order by
+      String query = "SELECT column1, column5 FROM testTable GROUP BY column1, column5 ORDER BY column1, column5 "
+          + " LIMIT 10";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+      assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 400000L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 800000L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"column1", "column5"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING}));
+      List<Object[]> resultRows = resultTable.getRows();
+      int previousVal = -1;
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 2);
+        assertTrue((int) resultRow[0] > previousVal);
+        previousVal = (int) resultRow[0];
+      }
+    }
+    {
+      // Select forwardIndexDisabled columns using transform with group by order by
+      String query = "SELECT ARRAYLENGTH(column6) FROM testTable GROUP BY ARRAYLENGTH(column6) ORDER BY "
+          + "ARRAYLENGTH(column6) LIMIT 10";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a column in select group by order by");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Test a select with a VALUEIN transform function with group by
+      String query = "SELECT VALUEIN(column6, '1001') from testTable WHERE column6 IN (1001) GROUP BY "
+          + "VALUEIN(column6, '1001') LIMIT 10";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a column in select group by order by");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+  }
+
+  @Test
+  public void testSelectWithAggregationQueries() {
+    {
+      // Allowed aggregation functions on forwardIndexDisabled columns
+      String query = "SELECT maxmv(column7), minmv(column6), minmaxrangemv(column6), distinctcountmv(column7), "
+          + "distinctcounthllmv(column6), distinctcountrawhllmv(column7) from testTable";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
+      assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 400_000L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 0L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"maxmv(column7)", "minmv(column6)",
+          "minmaxrangemv(column6)", "distinctcountmv(column7)", "distinctcounthllmv(column6)",
+          "distinctcountrawhllmv(column7)"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+              DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.LONG,
+              DataSchema.ColumnDataType.STRING}));
+      List<Object[]> resultRows = resultTable.getRows();
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 6);
+        assertEquals(resultRow[0], 2.147483647E9);
+        assertEquals(resultRow[1], 1001.0);
+        assertEquals(resultRow[2], 2.147482646E9);
+        assertEquals(resultRow[3], 359);
+        assertEquals(resultRow[4], 20039L);
+      }
+    }
+    {
+      // Not allowed aggregation functions on forwardIndexDisabled columns
+      String query = "SELECT summv(column7), avgmv(column6) from testTable";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Allowed aggregation functions on forwardIndexDisabled columns with group by on non-forwardIndexDisabled
+      // column
+      String query = "SELECT column1, maxmv(column6) from testTable GROUP BY column1";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Allowed aggregation functions on forwardIndexDisabled columns with group by order by on
+      // non-forwardIndexDisabled column
+      String query = "SELECT column1, maxmv(column6) from testTable GROUP BY column1 ORDER BY column1";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Allowed aggregation functions on non-forwardIndexDisabled columns with group by on non-forwardIndexDisabled
+      // column but order by on allowed aggregation function on forwardIndexDisabled column
+      String query = "SELECT column1, max(column9) from testTable GROUP BY column1 ORDER BY minmv(column6)";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Allowed aggregation functions on forwardIndexDisabled columns with a filter - results in trying to scan which
+      // fails
+      String query = "SELECT maxmv(column7), minmv(column6) from testTable WHERE column7 = 201";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Allowed aggregation functions on forwardIndexDisabled columns with a filter - results in trying to scan which
+      // fails
+      String query = "SELECT max(column1), minmv(column6) from testTable WHERE column1 > 15935";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Allowed aggregation functions on non-forwardIndexDisabled columns with a filter on a forwardIndexDisabled
+      // column
+      String query = "SELECT max(column1), sum(column9) from testTable WHERE column7 = 2147483647";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
+      assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 199_756L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 399_512L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"max(column1)", "sum(column9)"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE}));
+      List<Object[]> resultRows = resultTable.getRows();
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 2);
+        assertEquals(resultRow[0], 2.147313491E9);
+        assertEquals(resultRow[1], 1.38051889779548E14);
+      }
+    }
+    {
+      // Allowed aggregation functions on non-forwardIndexDisabled columns with a filter on a forwardIndexDisabled
+      // column and group by order by on non-forwardIndexDisabled column
+      String query = "SELECT column1, max(column1), sum(column9) from testTable WHERE column7 = 2147483647 GROUP BY "
+          + "column1 ORDER BY column1";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+      assertEquals(brokerResponseNative.getTotalDocs(), 400_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 199_756L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 399_512L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"column1", "max(column1)", "sum(column9)"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.DOUBLE,
+              DataSchema.ColumnDataType.DOUBLE}));
+      List<Object[]> resultRows = resultTable.getRows();
+      int previousVal = -1;
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 3);
+        assertTrue((int) resultRow[0] > previousVal);
+        previousVal = (int) resultRow[0];
+      }
+    }
+    {
+      // Allowed aggregation functions on non-forwardIndexDisabled columns with a filter on a forwardIndexDisabled
+      // column and group by on non-forwardIndexDisabled column order by on forwardIndexDisabled aggregation column
+      String query = "SELECT column1, max(column1), sum(column9) from testTable WHERE column7 = 201 GROUP BY "
+          + "column1 ORDER BY minmv(column6)";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Transform inside aggregation involving a forwardIndexDisabled column
+      String query = "SELECT MAX(ARRAYLENGTH(column6)) from testTable LIMIT 10";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Transform inside aggregation involving a forwardIndexDisabled column with group by
+      String query = "SELECT column1, MAX(ARRAYLENGTH(column6)) from testTable GROUP BY column1 LIMIT 10";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Transform inside aggregation involving a forwardIndexDisabled column with group by order by
+      String query = "SELECT column1, MAX(ARRAYLENGTH(column6)) from testTable GROUP BY column1 ORDER BY column1 "
+          + "DESC LIMIT 10";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledSingleValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledSingleValueQueriesTest.java
new file mode 100644
index 0000000000..44536e0951
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/ForwardIndexDisabledSingleValueQueriesTest.java
@@ -0,0 +1,1217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.TimeGranularitySpec;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * The <code>ForwardIndexDisabledSingleValueQueriesTest</code> class sets up the index segment for the no forward
+ * index single-value queries test.
+ * <p>There are totally 18 columns, 30000 records inside the original Avro file where 11 columns are selected to build
+ * the index segment. Selected columns information are as following:
+ * <ul>
+ *   ColumnName, FieldType, DataType, Cardinality, IsSorted, HasInvertedIndex, FwdIndexDisabled: S1, S2, HasRangeIndex
+ *   <li>column1, METRIC, INT, 6582, F, F, F, F, F</li>
+ *   <li>column3, METRIC, INT, 21910, F, F, F, F, F</li>
+ *   <li>column5, DIMENSION, STRING, 1, T, F, F, F, F</li>
+ *   <li>column6, DIMENSION, INT, 608, F, T, T, T, T</li>
+ *   <li>column7, DIMENSION, INT, 146, F, T, T, F, F</li>
+ *   <li>column9, DIMENSION, INT, 1737, F, F, F, F, F</li>
+ *   <li>column11, DIMENSION, STRING, 5, F, T, F, F, F</li>
+ *   <li>column12, DIMENSION, STRING, 5, F, F, F, F, F</li>
+ *   <li>column17, METRIC, INT, 24, F, T, F, F, F</li>
+ *   <li>column18, METRIC, INT, 1440, F, T, F, F, F</li>
+ *   <li>daysSinceEpoch, TIME, INT, 2, T, F, F, F, F</li>
+ * </ul>
+ */
+public class ForwardIndexDisabledSingleValueQueriesTest extends BaseQueriesTest {
+  private static final String AVRO_DATA = "data" + File.separator + "test_data-sv.avro";
+  private static final String SEGMENT_NAME_1 = "testTable_126164076_167572857";
+  private static final String SEGMENT_NAME_2 = "testTable_126164076_167572858";
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
+      "ForwardIndexDisabledSingleValueQueriesTest");
+
+  private static final String SELECT_STAR_QUERY = "SELECT * FROM testTable";
+
+  // Hard-coded query filter.
+  private static final String FILTER = " WHERE column1 > 100000000"
+      + " AND column3 BETWEEN 20000000 AND 1000000000"
+      + " AND column5 = 'gFuH'"
+      + " AND (column6 < 500000000 OR column11 NOT IN ('t', 'P'))"
+      + " AND daysSinceEpoch = 126164076";
+
+  private IndexSegment _indexSegment;
+  // Contains 2 index segments, one with 2 columns with forward index disabled, and the other with just 1.
+  private List<IndexSegment> _indexSegments;
+
+  private TableConfig _tableConfig;
+  private List<String> _invertedIndexColumns;
+  private List<String> _forwardIndexDisabledColumns;
+
+  @BeforeClass
+  public void buildAndLoadSegment()
+      throws Exception {
+    FileUtils.deleteQuietly(INDEX_DIR);
+
+    // Get resource file path.
+    URL resource = getClass().getClassLoader().getResource(AVRO_DATA);
+    assertNotNull(resource);
+    String filePath = resource.getFile();
+
+    // Build the segment schema.
+    Schema schema = new Schema.SchemaBuilder().setSchemaName("testTable").addMetric("column1", FieldSpec.DataType.INT)
+        .addMetric("column3", FieldSpec.DataType.INT).addSingleValueDimension("column5", FieldSpec.DataType.STRING)
+        .addSingleValueDimension("column6", FieldSpec.DataType.INT)
+        .addSingleValueDimension("column7", FieldSpec.DataType.INT)
+        .addSingleValueDimension("column9", FieldSpec.DataType.INT)
+        .addSingleValueDimension("column11", FieldSpec.DataType.STRING)
+        .addSingleValueDimension("column12", FieldSpec.DataType.STRING).addMetric("column17", FieldSpec.DataType.INT)
+        .addMetric("column18", FieldSpec.DataType.INT)
+        .addTime(new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, "daysSinceEpoch"), null).build();
+
+    createSegment(filePath, SEGMENT_NAME_1, schema);
+    createSegment(filePath, SEGMENT_NAME_2, schema);
+
+    ImmutableSegment immutableSegment1 = loadSegmentWithMetadataChecks(SEGMENT_NAME_1);
+    ImmutableSegment immutableSegment2 = loadSegmentWithMetadataChecks(SEGMENT_NAME_2);
+
+    _indexSegment = immutableSegment1;
+    _indexSegments = Arrays.asList(immutableSegment1, immutableSegment2);
+  }
+
+  private void createSegment(String filePath, String segmentName, Schema schema)
+      throws Exception {
+    // Create field configs for the no forward index columns
+    List<FieldConfig> fieldConfigList = new ArrayList<>();
+    fieldConfigList.add(new FieldConfig("column6", FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(), null,
+        Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString())));
+    if (segmentName.equals(SEGMENT_NAME_1)) {
+      fieldConfigList.add(new FieldConfig("column7", FieldConfig.EncodingType.DICTIONARY, Collections.emptyList(),
+          null, Collections.singletonMap(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString())));
+
+      // Build table config based on segment 1 as it contains both columns under no forward index
+      _tableConfig =
+          new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch")
+              .setFieldConfigList(fieldConfigList).setRangeIndexColumns(Arrays.asList("column6")).build();
+    }
+
+    // Create the segment generator config.
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(_tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(filePath);
+    segmentGeneratorConfig.setTableName("testTable");
+    segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
+    segmentGeneratorConfig.setSegmentName(segmentName);
+    // The segment generation code in SegmentColumnarIndexCreator will throw
+    // exception if start and end time in time column are not in acceptable
+    // range. For this test, we first need to fix the input avro data
+    // to have the time column values in allowed range. Until then, the check
+    // is explicitly disabled
+    segmentGeneratorConfig.setSkipTimeValueCheck(true);
+    _invertedIndexColumns = Arrays.asList("column6", "column7", "column11", "column17", "column18");
+    segmentGeneratorConfig.setInvertedIndexCreationColumns(_invertedIndexColumns);
+
+    _forwardIndexDisabledColumns = new ArrayList<>(Arrays.asList("column6", "column7"));
+    segmentGeneratorConfig.setForwardIndexDisabledColumns(_forwardIndexDisabledColumns);
+    segmentGeneratorConfig.setRangeIndexCreationColumns(Arrays.asList("column6"));
+
+    // Build the index segment.
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+  }
+
+  private ImmutableSegment loadSegmentWithMetadataChecks(String segmentName)
+      throws Exception {
+    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
+    indexLoadingConfig.setTableConfig(_tableConfig);
+    indexLoadingConfig.setInvertedIndexColumns(new HashSet<>(_invertedIndexColumns));
+    indexLoadingConfig.setForwardIndexDisabledColumns(new HashSet<>(_forwardIndexDisabledColumns));
+    indexLoadingConfig.setRangeIndexColumns(new HashSet<>(Arrays.asList("column6")));
+    indexLoadingConfig.setReadMode(ReadMode.heap);
+
+    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName),
+        indexLoadingConfig);
+
+    Map<String, ColumnMetadata> columnMetadataMap1 = immutableSegment.getSegmentMetadata().getColumnMetadataMap();
+    columnMetadataMap1.forEach((column, metadata) -> {
+      if (column.equals("column6") || column.equals("column7")) {
+        assertTrue(metadata.hasDictionary());
+        assertTrue(metadata.isSingleValue());
+        assertNull(immutableSegment.getForwardIndex(column));
+      } else {
+        assertNotNull(immutableSegment.getForwardIndex(column));
+      }
+    });
+
+    return immutableSegment;
+  }
+
+  @AfterClass
+  public void deleteAndDestroySegment() {
+    FileUtils.deleteQuietly(INDEX_DIR);
+    _indexSegments.forEach((IndexSegment::destroy));
+  }
+
+  @Override
+  protected String getFilter() {
+    return FILTER;
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
+
+  @Test
+  public void testSelectStarQueries() {
+    // Select * without any filters
+    try {
+      getBrokerResponse(SELECT_STAR_QUERY);
+      Assert.fail("Select * query should fail since forwardIndexDisabled on a select column");
+    } catch (IllegalStateException e) {
+      assertTrue(e.getMessage().contains("Forward index disabled for column:")
+          && e.getMessage().contains("cannot create DataFetcher!"));
+    }
+
+    // Select * with filters
+    try {
+      getBrokerResponse(SELECT_STAR_QUERY + FILTER);
+      Assert.fail("Select * query should fail since forwardIndexDisabled on a select column");
+    } catch (IllegalStateException e) {
+      assertTrue(e.getMessage().contains("Forward index disabled for column:")
+          && e.getMessage().contains("cannot create DataFetcher!"));
+    }
+  }
+
+  @Test
+  public void testSelectQueries() {
+    {
+      // Selection query without filters including a column with forwardIndexDisabled enabled on both segments
+      String query = "SELECT column1, column5, column6, column9, column11 FROM testTable";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a select column");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Selection query without filters including a column with forwardIndexDisabled enabled on one segment
+      String query = "SELECT column1, column5, column7, column9, column11 FROM testTable";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a select column");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Selection query with filters including a column with forwardIndexDisabled enabled on both segments
+      String query = "SELECT column1, column5, column6, column9, column11 FROM testTable WHERE column6 = 2147458029";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a select column");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Selection query with filters including a column with forwardIndexDisabled enabled on one segment
+      String query = "SELECT column1, column5, column7, column9, column11 FROM testTable WHERE column7 = 675695";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a select column");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Selection query without filters and without columns with forwardIndexDisabled enabled on either segment
+      String query = "SELECT column1, column5, column9, column11 FROM testTable ORDER BY column1";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+      assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 120_000L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 120_120L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      DataSchema dataSchema = new DataSchema(new String[]{"column1", "column5", "column9", "column11"},
+        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
+            DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING});
+      assertEquals(resultTable.getDataSchema(), dataSchema);
+      List<Object[]> resultRows = resultTable.getRows();
+      int previousColumn1 = Integer.MIN_VALUE;
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 4);
+        assertEquals((String) resultRow[1], "gFuH");
+        // Column 1
+        assertTrue((Integer) resultRow[0] >= previousColumn1);
+        previousColumn1 = (Integer) resultRow[0];
+      }
+
+      Object[] firstRow = resultRows.get(0);
+      // Column 11
+      assertEquals((String) firstRow[3], "o");
+    }
+    {
+      // Transform function on a selection clause with a forwardIndexDisabled column in transform
+      String query = "SELECT CONCAT(column6, column9, '-') from testTable";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a column in transform");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Transform function on a selection clause without a forwardIndexDisabled column in transform
+      String query = "SELECT CONCAT(column5, column9, '-') from testTable ORDER BY column1";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+      assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 120_000L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 120_080L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      DataSchema dataSchema = new DataSchema(new String[]{"concat(column5,column9,'-')"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING});
+      assertEquals(resultTable.getDataSchema(), dataSchema);
+      List<Object[]> resultRows = resultTable.getRows();
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 1);
+        assertTrue(resultRow[0].toString().startsWith("gFuH-"));
+      }
+    }
+    {
+      // Selection query with filters (not including forwardIndexDisabled column) and without columns with
+      // forwardIndexDisabled enabled on either segment
+      String query = "SELECT column1, column5, column9, column11 FROM testTable WHERE column1 > 100000000"
+        + " AND column3 BETWEEN 20000000 AND 1000000000"
+        + " AND column5 = 'gFuH'"
+        + " AND daysSinceEpoch = 126164076 ORDER BY column1";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+      assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 42_368L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 42_488);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 192744L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      DataSchema dataSchema = new DataSchema(new String[]{"column1", "column5", "column9", "column11"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
+              DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING});
+      assertEquals(resultTable.getDataSchema(), dataSchema);
+      List<Object[]> resultRows = resultTable.getRows();
+      int previousColumn1 = Integer.MIN_VALUE;
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 4);
+        assertEquals((String) resultRow[1], "gFuH");
+        assertTrue((Integer) resultRow[0] >= previousColumn1);
+        previousColumn1 = (Integer) resultRow[0];
+      }
+
+      Object[] firstRow = resultRows.get(0);
+      // Column 11
+      assertEquals((String) firstRow[3], "P");
+    }
+    {
+      // Selection query with supported filters (including forwardIndexDisabled column) and without columns with
+      // forwardIndexDisabled enabled on either segment
+      String query = "SELECT column1, column5, column9, column11 FROM testTable WHERE column6 = 2147458029 "
+          + "ORDER BY column1";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+      assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 16L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 64L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      DataSchema dataSchema = new DataSchema(new String[]{"column1", "column5", "column9", "column11"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
+              DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING});
+      assertEquals(resultTable.getDataSchema(), dataSchema);
+      List<Object[]> resultRows = resultTable.getRows();
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 4);
+        assertEquals(resultRow[0], 635553468);
+        assertEquals((String) resultRow[1], "gFuH");
+        assertEquals(resultRow[2], 705242697);
+        assertEquals((String) resultRow[3], "P");
+      }
+    }
+    {
+      // Selection query with supported filters (including forwardIndexDisabled column) and without columns with
+      // forwardIndexDisabled enabled on either segment
+      String query = "SELECT column1, column5, column9, column11 FROM testTable WHERE column7 != 675695 "
+          + "ORDER BY column1";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+      assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 119_908L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 120_028L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      DataSchema dataSchema = new DataSchema(new String[]{"column1", "column5", "column9", "column11"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
+              DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING});
+      assertEquals(resultTable.getDataSchema(), dataSchema);
+      List<Object[]> resultRows = resultTable.getRows();
+      int previousColumn1 = Integer.MIN_VALUE;
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 4);
+        assertEquals((String) resultRow[1], "gFuH");
+        assertTrue((Integer) resultRow[0] >= previousColumn1);
+        previousColumn1 = (Integer) resultRow[0];
+      }
+
+      Object[] firstRow = resultRows.get(0);
+      // Column 11
+      assertEquals((String) firstRow[3], "o");
+    }
+    {
+      // Selection query with supported filters (including forwardIndexDisabled column) and without columns with
+      // forwardIndexDisabled enabled on either segment
+      String query = "SELECT column1, column5, column9, column11 FROM testTable WHERE column7 IN (675695, 2137685743) "
+          + "ORDER BY column1";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+      assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 828L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 948L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      DataSchema dataSchema = new DataSchema(new String[]{"column1", "column5", "column9", "column11"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
+              DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING});
+      assertEquals(resultTable.getDataSchema(), dataSchema);
+      List<Object[]> resultRows = resultTable.getRows();
+      int previousColumn1 = Integer.MIN_VALUE;
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 4);
+        assertEquals((String) resultRow[1], "gFuH");
+        assertTrue((Integer) resultRow[0] >= previousColumn1);
+        previousColumn1 = (Integer) resultRow[0];
+      }
+
+      Object[] firstRow = resultRows.get(0);
+      // Column 11
+      assertEquals((String) firstRow[3], "P");
+    }
+    {
+      // Selection query with supported filters (including forwardIndexDisabled column) and without columns with
+      // forwardIndexDisabled enabled on either segment
+      String query = "SELECT column1, column5, column9, column11 FROM testTable WHERE column6 NOT IN "
+          + "(1689277, 2147458029) ORDER BY column1";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+      assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 119_980L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 120_100L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      DataSchema dataSchema = new DataSchema(new String[]{"column1", "column5", "column9", "column11"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
+              DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING});
+      assertEquals(resultTable.getDataSchema(), dataSchema);
+      List<Object[]> resultRows = resultTable.getRows();
+      int previousColumn1 = Integer.MIN_VALUE;
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 4);
+        assertEquals((String) resultRow[1], "gFuH");
+        assertTrue((Integer) resultRow[0] >= previousColumn1);
+        previousColumn1 = (Integer) resultRow[0];
+      }
+
+      Object[] firstRow = resultRows.get(0);
+      // Column 11
+      assertEquals((String) firstRow[3], "o");
+    }
+    {
+      // Query with literal only in SELECT
+      String query = "SELECT 'marvin' from testTable ORDER BY column1";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+      assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 120_000L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 120_000L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"'marvin'"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING}));
+      List<Object[]> resultRows = resultTable.getRows();
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 1);
+        assertEquals((String) resultRow[0], "marvin");
+      }
+    }
+    {
+      // Selection query with '<' filter on a forwardIndexDisabled column with range index available
+      String query = "SELECT column1, column5, column9, column11 FROM testTable WHERE column6 < 2147458029 AND "
+          + "column6 > 1699000 ORDER BY column1";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+      assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 119_980L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 120_100L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      DataSchema dataSchema = new DataSchema(new String[]{"column1", "column5", "column9", "column11"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
+              DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING});
+      assertEquals(resultTable.getDataSchema(), dataSchema);
+      List<Object[]> resultRows = resultTable.getRows();
+      int previousColumn1 = Integer.MIN_VALUE;
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 4);
+        assertEquals((String) resultRow[1], "gFuH");
+        assertTrue((Integer) resultRow[0] >= previousColumn1);
+        previousColumn1 = (Integer) resultRow[0];
+      }
+
+      Object[] firstRow = resultRows.get(0);
+      // Column 11
+      assertEquals((String) firstRow[3], "o");
+    }
+    {
+      // Selection query with '>=' filter on a forwardIndexDisabled column without range index available
+      String query = "SELECT column1, column5, column9, column11 FROM testTable WHERE column7 >= 676000";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a range query column without range index");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("scan based filtering not supported!"));
+      }
+    }
+    {
+      // Select query with a filter on a column which doesn't have forwardIndexDisabled enabled
+      String query = "SELECT column1, column5, column9 from testTable WHERE column9 < 50000 ORDER BY column1";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 4);
+      assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 12L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 120000L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"column1", "column5", "column9"},
+          new DataSchema.ColumnDataType[]{
+          DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT
+      }));
+      List<Object[]> resultRows = resultTable.getRows();
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 3);
+      }
+    }
+    {
+      // Transform function on a filter clause for forwardIndexDisabled column in transform
+      String query = "SELECT column1, column11 from testTable WHERE CONCAT(column6, column9, '-') = '1689277-11270'";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() != null
+          && brokerResponseNative.getProcessingExceptions().size() > 0);
+    }
+    {
+      // Transform function on a filter clause for a non-forwardIndexDisabled column in transform
+      String query = "SELECT column1, column11 from testTable WHERE CONCAT(column5, column9, '-') = 'gFuH-11270' "
+          + "ORDER BY column1";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 4);
+      assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 8L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 120000L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"column1", "column11"},
+          new DataSchema.ColumnDataType[]{
+              DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+          }));
+      List<Object[]> resultRows = resultTable.getRows();
+      assertEquals(resultRows.size(), 4);
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 2);
+        assertEquals(resultRow[0], 815409257);
+        assertEquals(resultRow[1], "P");
+      }
+    }
+    {
+      // Transform function on a filter clause for forwardIndexDisabled column in transform
+      String query = "SELECT column1, column11 from testTable WHERE CONCAT(ADD(column6, column1), column9, '-') = "
+          + "'1689277-11270'";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() != null
+          && brokerResponseNative.getProcessingExceptions().size() > 0);
+    }
+    {
+      // Transform function on a filter clause for a non-forwardIndexDisabled column in transform
+      String query = "SELECT column1, column11 from testTable WHERE CONCAT(column5, ADD(column9, column1), '-') = "
+          + "'gFuH-2.96708164E8' ORDER BY column1";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+      assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 28L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 56L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 120000L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"column1", "column11"},
+          new DataSchema.ColumnDataType[]{
+              DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING
+          }));
+      List<Object[]> resultRows = resultTable.getRows();
+      assertEquals(resultRows.size(), 10);
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 2);
+        assertEquals(resultRow[0], 240528);
+        assertEquals(resultRow[1], "o");
+      }
+    }
+  }
+
+  @Test
+  public void testSelectWithDistinctQueries() {
+    {
+      // Select a mix of forwardIndexDisabled and non-forwardIndexDisabled columns with distinct
+      String query = "SELECT DISTINCT column1, column6, column9 FROM testTable LIMIT 10";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a column in select distinct");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Select non-forwardIndexDisabled columns with distinct
+      String query = "SELECT DISTINCT column1, column5, column9 FROM testTable ORDER BY column1 LIMIT 10";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+      assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 120_000L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 360_000L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"column1", "column5", "column9"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING,
+              DataSchema.ColumnDataType.INT}));
+      List<Object[]> resultRows = resultTable.getRows();
+      int previousColumn1 = Integer.MIN_VALUE;
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 3);
+        assertEquals(resultRow[1], "gFuH");
+        assertTrue((Integer) resultRow[0] >= previousColumn1);
+        previousColumn1 = (Integer) resultRow[0];
+      }
+    }
+  }
+
+  @Test
+  public void testSelectWithGroupByOrderByQueries() {
+    {
+      // Select a mix of forwardIndexDisabled and non-forwardIndexDisabled columns with group by order by
+      String query = "SELECT column1, column6 FROM testTable GROUP BY column1, column6 ORDER BY column1, column6 "
+          + " LIMIT 10";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a column in select group by order by");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Select forwardIndexDisabled columns with group by order by
+      String query = "SELECT column7, column6 FROM testTable GROUP BY column7, column6 ORDER BY column7, column6 "
+          + " LIMIT 10";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a column in select group by order by");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Select non-forwardIndexDisabled columns with group by order by
+      String query = "SELECT column1, column5 FROM testTable GROUP BY column1, column5 ORDER BY column1, column5 "
+          + " LIMIT 10";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+      assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 120000L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 240000L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"column1", "column5"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING}));
+      List<Object[]> resultRows = resultTable.getRows();
+      int previousVal = -1;
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 2);
+        assertEquals(resultRow[1], "gFuH");
+        assertTrue((int) resultRow[0] > previousVal);
+        previousVal = (int) resultRow[0];
+      }
+    }
+    {
+      // Select forwardIndexDisabled columns using transform with group by order by
+      String query = "SELECT CONCAT(column1, column6, '-') FROM testTable GROUP BY CONCAT(column1, column6, '-') "
+          + "ORDER BY CONCAT(column1, column6, '-') LIMIT 10";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a transformed column in group by order by");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Select non-forwardIndexDisabled columns using transform with group by order by
+      String query = "SELECT CONCAT(column1, column5, '-') FROM testTable GROUP BY CONCAT(column1, column5, '-') "
+          + "ORDER BY CONCAT(column1, column5, '-') LIMIT 10";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+      assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 120000L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 240000L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"concat(column1,column5,'-')"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING}));
+      List<Object[]> resultRows = resultTable.getRows();
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 1);
+        assertTrue(resultRow[0].toString().endsWith("-gFuH"));
+      }
+    }
+    {
+      // Select forwardIndexDisabled columns using nested transform with group by order by
+      String query = "SELECT CONCAT(ADD(column1, column6), column5, '-') FROM testTable GROUP BY "
+          + "CONCAT(ADD(column1, column6), column5, '-') ORDER BY CONCAT(ADD(column1, column6), column5, '-') LIMIT 10";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail for forwardIndexDisabled on a nested transformed column in group by order by");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Select non-forwardIndexDisabled columns using nested transform with group by order by
+      String query = "SELECT CONCAT(ADD(column1, column9), column5, '-') FROM testTable GROUP BY "
+          + "CONCAT(ADD(column1, column9), column5, '-') ORDER BY CONCAT(ADD(column1, column9), column5, '-') LIMIT 10";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+      assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 120000L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 360000L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"concat(add(column1,column9),column5,'-')"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING}));
+      List<Object[]> resultRows = resultTable.getRows();
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 1);
+        assertTrue(resultRow[0].toString().endsWith("-gFuH"));
+      }
+    }
+  }
+
+  @Test
+  public void testSelectWithAggregationQueries() {
+    {
+      // Allowed aggregation functions on forwardIndexDisabled columns
+      String query = "SELECT max(column7), min(column7), count(column7), minmaxrange(column7), "
+          + "distinctcount(column7), distinctcounthll(column6), distinctcountrawhll(column7), "
+          + "distinctcountsmarthll(column6) from testTable";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
+      assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 120_000L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 0L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"max(column7)", "min(column7)", "count(*)",
+      "minmaxrange(column7)", "distinctcount(column7)", "distinctcounthll(column6)", "distinctcountrawhll(column7)",
+      "distinctcountsmarthll(column6)"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+              DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.INT,
+              DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT}));
+      List<Object[]> resultRows = resultTable.getRows();
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 8);
+        assertEquals(resultRow[0], 2.137685743E9);
+        assertEquals(resultRow[1], 675695.0);
+        assertEquals(resultRow[2], 120000L);
+        assertEquals(resultRow[3], 2.137010048E9);
+        assertEquals(resultRow[4], 146);
+        assertEquals(resultRow[5], 695L);
+        assertEquals(resultRow[7], 608);
+      }
+    }
+    {
+      // Not allowed aggregation functions on forwardIndexDisabled columns
+      String query = "SELECT sum(column7), avg(column6) from testTable";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Allowed aggregation functions on forwardIndexDisabled columns with group by on non-forwardIndexDisabled
+      // column
+      String query = "SELECT column1, max(column6) from testTable GROUP BY column1";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Allowed aggregation functions on forwardIndexDisabled columns with group by order by on
+      // non-forwardIndexDisabled column
+      String query = "SELECT column1, max(column6) from testTable GROUP BY column1 ORDER BY column1";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Allowed aggregation functions on non-forwardIndexDisabled columns with group by on non-forwardIndexDisabled
+      // column but order by on allowed aggregation function on forwardIndexDisabled column
+      String query = "SELECT column1, max(column9) from testTable GROUP BY column1 ORDER BY min(column6)";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Allowed aggregation functions on forwardIndexDisabled columns with a filter - results in trying to scan which
+      // fails
+      String query = "SELECT max(column7), min(column6) from testTable WHERE column7 = 675695";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Allowed aggregation functions on forwardIndexDisabled columns with a filter - results in trying to scan which
+      // fails
+      String query = "SELECT max(column1), min(column6) from testTable WHERE column1 > 675695";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Allowed aggregation functions on non-forwardIndexDisabled columns with a filter on a forwardIndexDisabled
+      // column
+      String query = "SELECT max(column1), sum(column9) from testTable WHERE column7 = 675695";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
+      assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 92L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 184L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"max(column1)", "sum(column9)"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE}));
+      List<Object[]> resultRows = resultTable.getRows();
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 2);
+        assertEquals(resultRow[0], 2.106334109E9);
+        assertEquals(resultRow[1], 5.7037961104E10);
+      }
+    }
+    {
+      // Allowed aggregation functions on non-forwardIndexDisabled columns with a filter on a forwardIndexDisabled
+      // column and group by order by on non-forwardIndexDisabled column
+      String query = "SELECT column1, max(column1), sum(column9) from testTable WHERE column7 = 675695 GROUP BY "
+          + "column1 ORDER BY column1";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+      assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 92L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 184L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"column1", "max(column1)", "sum(column9)"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.DOUBLE,
+              DataSchema.ColumnDataType.DOUBLE}));
+      List<Object[]> resultRows = resultTable.getRows();
+      int previousVal = -1;
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 3);
+        assertTrue((int) resultRow[0] > previousVal);
+        previousVal = (int) resultRow[0];
+      }
+    }
+    {
+      // Allowed aggregation functions on non-forwardIndexDisabled columns with a filter on a forwardIndexDisabled
+      // column and group by on non-forwardIndexDisabled column order by on forwardIndexDisabled aggregation column
+      String query = "SELECT column1, max(column1), sum(column9) from testTable WHERE column7 = 675695 GROUP BY "
+          + "column1 ORDER BY max(column6)";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Transform inside aggregation involving a forwardIndexDisabled column
+      String query = "SELECT MAX(ADD(column6, column9)) from testTable LIMIT 10";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Transform inside aggregation not involving any forwardIndexDisabled columns
+      String query = "SELECT MAX(ADD(column1, column9)) from testTable LIMIT 10";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 1);
+      assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 120_000L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 240000L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"max(add(column1,column9))"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE}));
+      List<Object[]> resultRows = resultTable.getRows();
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 1);
+        assertEquals(resultRow[0], 4.264013718E9);
+      }
+    }
+    {
+      // Transform inside aggregation involving a forwardIndexDisabled column with group by
+      String query = "SELECT column1, MAX(ADD(column6, column9)) from testTable GROUP BY column1 LIMIT 10";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Transform inside aggregation not involving any forwardIndexDisabled column with group by
+      String query = "SELECT column1, MAX(ADD(column1, column9)) from testTable GROUP BY column1 LIMIT 10";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+      assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 120_000L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 240000L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"column1", "max(add(column1,column9))"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.DOUBLE}));
+      List<Object[]> resultRows = resultTable.getRows();
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 2);
+      }
+    }
+    {
+      // Transform inside aggregation involving a forwardIndexDisabled column with group by order by
+      String query = "SELECT column1, MAX(ADD(column6, column9)) from testTable GROUP BY column1 ORDER BY column1 "
+          + "DESC LIMIT 10";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a column in unsupported aggregation query");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // Transform inside aggregation not involving any forwardIndexDisabled column with group by order by
+      String query = "SELECT column1, MAX(ADD(column1, column9)) from testTable GROUP BY column1 ORDER BY column1 "
+          + "DESC LIMIT 10";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+      assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 120_000L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 240000L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"column1", "max(add(column1,column9))"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.DOUBLE}));
+      List<Object[]> resultRows = resultTable.getRows();
+      int previousVal = Integer.MAX_VALUE;
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 2);
+        assertTrue((int) resultRow[0] <= previousVal);
+        previousVal = (int) resultRow[0];
+      }
+    }
+  }
+
+  @Test
+  public void testSelectWithAggregationGroupByHaving() {
+    {
+      // forwardIndexDisabled column used in HAVING clause
+      String query = "SELECT min(column7), max(column6) from testTable GROUP BY column1 HAVING min(column7) > 675695 "
+          + "ORDER BY column1 LIMIT 10";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a column in HAVING clause");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // forwardIndexDisabled column not used in HAVING clause but used in aggregation select
+      String query = "SELECT max(column6), min(column9) from testTable GROUP BY column1 HAVING min(column9) > 11270 "
+          + "ORDER BY column1 LIMIT 10";
+      try {
+        getBrokerResponse(query);
+        Assert.fail("Query should fail since forwardIndexDisabled on a column in HAVING clause");
+      } catch (IllegalStateException e) {
+        assertTrue(e.getMessage().contains("Forward index disabled for column:")
+            && e.getMessage().contains("cannot create DataFetcher!"));
+      }
+    }
+    {
+      // forwardIndexDisabled column not used in HAVING clause or aggregation select
+      String query = "SELECT min(column9), column1 from testTable GROUP BY column1, column9 HAVING min(column9) "
+          + "> 11270 ORDER BY column9 DESC LIMIT 10";
+      BrokerResponseNative brokerResponseNative = getBrokerResponse(query);
+      assertTrue(brokerResponseNative.getProcessingExceptions() == null
+          || brokerResponseNative.getProcessingExceptions().size() == 0);
+      ResultTable resultTable = brokerResponseNative.getResultTable();
+      assertEquals(brokerResponseNative.getNumRowsResultSet(), 10);
+      assertEquals(brokerResponseNative.getTotalDocs(), 120_000L);
+      assertEquals(brokerResponseNative.getNumDocsScanned(), 120_000L);
+      assertEquals(brokerResponseNative.getNumSegmentsProcessed(), 4L);
+      assertEquals(brokerResponseNative.getNumSegmentsMatched(), 4L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedPostFilter(), 240000L);
+      assertEquals(brokerResponseNative.getNumEntriesScannedInFilter(), 0L);
+      assertNotNull(brokerResponseNative.getProcessingExceptions());
+      assertEquals(brokerResponseNative.getProcessingExceptions().size(), 0);
+      assertEquals(resultTable.getDataSchema(), new DataSchema(new String[]{"min(column9)", "column1"},
+          new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.INT}));
+      List<Object[]> resultRows = resultTable.getRows();
+      double previousVal = Double.MAX_VALUE;
+      for (Object[] resultRow : resultRows) {
+        assertEquals(resultRow.length, 2);
+        assertTrue((double) resultRow[0] <= previousVal);
+        previousVal = (double) resultRow[0];
+      }
+    }
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableColumnStatistics.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableColumnStatistics.java
index 24ed15aa4c..cff6260b99 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableColumnStatistics.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableColumnStatistics.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.realtime.converter.stats;
 
+import com.google.common.base.Preconditions;
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
 import java.util.Set;
@@ -145,6 +146,8 @@ public class MutableColumnStatistics implements ColumnStatistics {
 
     // Iterate over all data to figure out whether or not it's in sorted order
     MutableForwardIndex mutableForwardIndex = (MutableForwardIndex) _dataSource.getForwardIndex();
+    Preconditions.checkState(mutableForwardIndex != null,
+        String.format("Forward index should not be null for column: %s", dataSourceMetadata.getFieldSpec().getName()));
     int numDocs = dataSourceMetadata.getNumDocs();
     // Iterate with the sorted order if provided
     if (_sortedDocIdIterationOrder != null) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java
index 55c714224e..c4a3c910b0 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/stats/MutableNoDictionaryColStatistics.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.realtime.converter.stats;
 
+import com.google.common.base.Preconditions;
 import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.segment.spi.creator.ColumnStatistics;
@@ -36,6 +37,8 @@ public class MutableNoDictionaryColStatistics implements ColumnStatistics {
   public MutableNoDictionaryColStatistics(DataSource dataSource) {
     _dataSourceMetadata = dataSource.getDataSourceMetadata();
     _forwardIndex = (MutableForwardIndex) dataSource.getForwardIndex();
+    Preconditions.checkState(_forwardIndex != null,
+        String.format("Forward index should not be null for column: %s", _dataSourceMetadata.getFieldSpec().getName()));
   }
 
   @Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java
index 7cb36a4db6..f42966fcc9 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/DefaultIndexCreatorProvider.java
@@ -28,6 +28,7 @@ import org.apache.pinot.segment.local.segment.creator.impl.bloom.OnHeapGuavaBloo
 import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.fwd.NoOpForwardIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueFixedByteRawIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSortedForwardIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator;
@@ -74,6 +75,7 @@ public final class DefaultIndexCreatorProvider implements IndexCreatorProvider {
   public ForwardIndexCreator newForwardIndexCreator(IndexCreationContext.Forward context)
       throws Exception {
     if (!context.hasDictionary()) {
+      // Dictionary disabled columns
       boolean deriveNumDocsPerChunk =
           shouldDeriveNumDocsPerChunk(context.getFieldSpec().getName(), context.getColumnProperties());
       int writerVersion = getRawIndexWriterVersion(context.getFieldSpec().getName(), context.getColumnProperties());
@@ -88,17 +90,25 @@ public final class DefaultIndexCreatorProvider implements IndexCreatorProvider {
             context.getMaxRowLengthInBytes());
       }
     } else {
-      if (context.getFieldSpec().isSingleValueField()) {
-        if (context.isSorted()) {
-          return new SingleValueSortedForwardIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
-              context.getCardinality());
+      // Dictionary enabled columns
+      if (context.forwardIndexDisabled() && !context.isSorted()) {
+        // Forward index disabled columns which aren't sorted
+        // Sorted columns treat this option as a no-op
+        return new NoOpForwardIndexCreator(context.getFieldSpec().isSingleValueField());
+      } else {
+        // Forward index enabled columns
+        if (context.getFieldSpec().isSingleValueField()) {
+          if (context.isSorted()) {
+            return new SingleValueSortedForwardIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
+                context.getCardinality());
+          } else {
+            return new SingleValueUnsortedForwardIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
+                context.getCardinality(), context.getTotalDocs());
+          }
         } else {
-          return new SingleValueUnsortedForwardIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
-              context.getCardinality(), context.getTotalDocs());
+          return new MultiValueUnsortedForwardIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
+              context.getCardinality(), context.getTotalDocs(), context.getTotalNumberOfEntries());
         }
-      } else {
-        return new MultiValueUnsortedForwardIndexCreator(context.getIndexDir(), context.getFieldSpec().getName(),
-            context.getCardinality(), context.getTotalDocs(), context.getTotalNumberOfEntries());
       }
     }
   }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
index d27b33e7a5..ba6f668e36 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -35,6 +35,7 @@ import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.pinot.common.utils.FileUtils;
 import org.apache.pinot.segment.local.io.util.PinotDataBitSet;
+import org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.nullvalue.NullValueVectorCreator;
 import org.apache.pinot.segment.local.segment.store.TextIndexUtils;
 import org.apache.pinot.segment.local.utils.GeometrySerializer;
@@ -174,6 +175,13 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
           "Cannot create json index for column: %s because it is not in schema", columnName);
     }
 
+    Set<String> forwardIndexDisabledColumns = new HashSet<>();
+    for (String columnName : _config.getForwardIndexDisabledColumns()) {
+      Preconditions.checkState(schema.hasColumn(columnName), String.format("Invalid config. Can't disable "
+          + "forward index creation for a column: %s that does not exist in schema", columnName));
+      forwardIndexDisabledColumns.add(columnName);
+    }
+
     Map<String, H3IndexConfig> h3IndexConfigs = _config.getH3IndexConfigs();
     for (String columnName : h3IndexConfigs.keySet()) {
       Preconditions.checkState(schema.hasColumn(columnName),
@@ -196,6 +204,10 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
       Preconditions.checkState(dictEnabledColumn || !invertedIndexColumns.contains(columnName),
           "Cannot create inverted index for raw index column: %s", columnName);
 
+      boolean forwardIndexDisabled = forwardIndexDisabledColumns.contains(columnName);
+      validateForwardIndexDisabledIndexCompatibility(columnName, forwardIndexDisabled, dictEnabledColumn,
+          columnIndexCreationInfo, invertedIndexColumns, rangeIndexColumns, rangeIndexVersion, fieldSpec);
+
       IndexCreationContext.Common context = IndexCreationContext.builder()
           .withIndexDir(_indexDir)
           .withCardinality(columnIndexCreationInfo.getDistinctValueCount())
@@ -208,6 +220,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
           .withColumnIndexCreationInfo(columnIndexCreationInfo)
           .sorted(columnIndexCreationInfo.isSorted())
           .onHeap(segmentCreationSpec.isOnHeap())
+          .withforwardIndexDisabled(forwardIndexDisabled)
           .build();
       // Initialize forward index creator
       ChunkCompressionType chunkCompressionType =
@@ -298,6 +311,45 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
     }
   }
 
+  /**
+   * Validates the compatibility of the indexes if the column has the forward index disabled. Throws exceptions due to
+   * compatibility mismatch. The checks performed are:
+   *     - Validate dictionary is enabled.
+   *     - Validate inverted index is enabled.
+   *     - Validate that either no range index exists for column or the range index version is at least 2 and isn't a
+   *       multi-value column (since multi-value defaults to index v1).
+   *
+   * @param columnName Name of the column
+   * @param forwardIndexDisabled Whether the forward index is disabled for column or not
+   * @param dictEnabledColumn Whether the column is dictionary enabled or not
+   * @param columnIndexCreationInfo Column index creation info
+   * @param invertedIndexColumns Set of columns with inverted index enabled
+   * @param rangeIndexColumns Set of columns with range index enabled
+   * @param rangeIndexVersion Range index version
+   * @param fieldSpec FieldSpec of column
+   */
+  private void validateForwardIndexDisabledIndexCompatibility(String columnName, boolean forwardIndexDisabled,
+      boolean dictEnabledColumn, ColumnIndexCreationInfo columnIndexCreationInfo, Set<String> invertedIndexColumns,
+      Set<String> rangeIndexColumns, int rangeIndexVersion, FieldSpec fieldSpec) {
+    if (!forwardIndexDisabled) {
+      return;
+    }
+
+    Preconditions.checkState(dictEnabledColumn,
+        String.format("Cannot disable forward index for column %s without dictionary", columnName));
+    Preconditions.checkState(invertedIndexColumns.contains(columnName),
+        String.format("Cannot disable forward index for column %s without inverted index enabled", columnName));
+    if (rangeIndexColumns.contains(columnName)) {
+      Preconditions.checkState(fieldSpec.isSingleValueField(),
+          String.format("Feature not supported for multi-value columns with range index. Cannot disable forward index "
+              + "for column %s. Disable range index on this column to use this feature", columnName));
+      Preconditions.checkState(rangeIndexVersion == BitSlicedRangeIndexCreator.VERSION,
+          String.format("Feature not supported for single-value columns with range index version < 2. Cannot disable "
+              + "forward index for column %s. Either disable range index or create range index with version >= 2 to "
+              + "use this feature", columnName));
+    }
+  }
+
   /**
    * Returns true if dictionary should be created for a column, false otherwise.
    * Currently there are two sources for this config:
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/NoOpForwardIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/NoOpForwardIndexCreator.java
new file mode 100644
index 0000000000..bc990ac616
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/NoOpForwardIndexCreator.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.segment.creator.impl.fwd;
+
+import java.io.IOException;
+import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * Forward index creator for dictionary-encoded single and multi-value columns with forward index disabled.
+ * This is a no-op.
+ */
+public class NoOpForwardIndexCreator implements ForwardIndexCreator {
+  private final boolean _isSingleValue;
+
+  public NoOpForwardIndexCreator(boolean isSingleValue) {
+    _isSingleValue = isSingleValue;
+  }
+
+  @Override
+  public boolean isDictionaryEncoded() {
+    return true;
+  }
+
+  @Override
+  public boolean isSingleValue() {
+    return _isSingleValue;
+  }
+
+  @Override
+  public FieldSpec.DataType getValueType() {
+    return FieldSpec.DataType.INT;
+  }
+
+  @Override
+  public void putDictId(int dictId) {
+  }
+
+  @Override
+  public void putDictIdMV(int[] dictIds) {
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+  }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
index beeb768a5c..3a32aa8ada 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
@@ -133,7 +133,10 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
       _rangeIndex = null;
     }
 
-    PinotDataBuffer fwdIndexBuffer = segmentReader.getIndexFor(columnName, ColumnIndexType.FORWARD_INDEX);
+    // Setting the 'fwdIndexBuffer' to null if forward index is disabled
+    PinotDataBuffer fwdIndexBuffer = segmentReader.hasIndexFor(columnName, ColumnIndexType.FORWARD_INDEX)
+        ? segmentReader.getIndexFor(columnName, ColumnIndexType.FORWARD_INDEX) : null;
+
     if (metadata.hasDictionary()) {
       // Dictionary-based index
       _dictionary = loadDictionary(segmentReader.getIndexFor(columnName, ColumnIndexType.DICTIONARY), metadata,
@@ -142,6 +145,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
         // Single-value
         if (metadata.isSorted()) {
           // Sorted
+          // No need to check for null 'fwdIndexBuffer' as for sorted columns this is a no-op
           SortedIndexReader<?> sortedIndexReader = indexReaderProvider.newSortedIndexReader(fwdIndexBuffer, metadata);
           _forwardIndex = sortedIndexReader;
           _invertedIndex = sortedIndexReader;
@@ -149,7 +153,12 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
           return;
         }
       }
-      _forwardIndex = indexReaderProvider.newForwardIndexReader(fwdIndexBuffer, metadata);
+      if (fwdIndexBuffer != null) {
+        _forwardIndex = indexReaderProvider.newForwardIndexReader(fwdIndexBuffer, metadata);
+      } else {
+        // Forward index disabled
+        _forwardIndex = null;
+      }
       if (loadInvertedIndex) {
         _invertedIndex = indexReaderProvider.newInvertedIndexReader(
             segmentReader.getIndexFor(columnName, ColumnIndexType.INVERTED_INDEX), metadata);
@@ -266,7 +275,9 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
   @Override
   public void close()
       throws IOException {
-    _forwardIndex.close();
+    if (_forwardIndex != null) {
+      _forwardIndex.close();
+    }
     if (_invertedIndex != null) {
       _invertedIndex.close();
     }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/BaseDataSource.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/BaseDataSource.java
index 828b7e4b51..cb40a4a453 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/BaseDataSource.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/BaseDataSource.java
@@ -45,7 +45,7 @@ public abstract class BaseDataSource implements DataSource {
   private final BloomFilterReader _bloomFilter;
   private final NullValueVectorReader _nullValueVector;
 
-  public BaseDataSource(DataSourceMetadata dataSourceMetadata, ForwardIndexReader<?> forwardIndex,
+  public BaseDataSource(DataSourceMetadata dataSourceMetadata, @Nullable ForwardIndexReader<?> forwardIndex,
       @Nullable Dictionary dictionary, @Nullable InvertedIndexReader<?> invertedIndex,
       @Nullable RangeIndexReader<?> rangeIndex, @Nullable TextIndexReader textIndex,
       @Nullable TextIndexReader fstIndex, @Nullable JsonIndexReader jsonIndex, @Nullable H3IndexReader h3Index,
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
index d83ebbfe58..1d798b5101 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
@@ -73,6 +73,7 @@ public class IndexLoadingConfig {
   private final Map<String, String> _noDictionaryConfig = new HashMap<>();
   private final Set<String> _varLengthDictionaryColumns = new HashSet<>();
   private Set<String> _onHeapDictionaryColumns = new HashSet<>();
+  private Set<String> _forwardIndexDisabledColumns = new HashSet<>();
   private Map<String, BloomFilterConfig> _bloomFilterConfigs = new HashMap<>();
   private boolean _enableDynamicStarTreeCreation;
   private List<StarTreeIndexConfig> _starTreeIndexConfigs;
@@ -169,6 +170,7 @@ public class IndexLoadingConfig {
     extractTextIndexColumnsFromTableConfig(tableConfig);
     extractFSTIndexColumnsFromTableConfig(tableConfig);
     extractH3IndexConfigsFromTableConfig(tableConfig);
+    extractForwardIndexDisabledColumnsFromTableConfig(tableConfig);
 
     Map<String, List<TimestampIndexGranularity>> timestampIndexConfigs =
         SegmentGeneratorConfig.extractTimestampIndexConfigsFromTableConfig(tableConfig);
@@ -329,6 +331,30 @@ public class IndexLoadingConfig {
     _segmentDirectoryLoader = instanceDataManagerConfig.getSegmentDirectoryLoader();
   }
 
+  /**
+   * Forward index disabled info for each column is specified
+   * using {@link FieldConfig} model of indicating per column
+   * encoding and indexing information. Since IndexLoadingConfig
+   * is created from TableConfig, we extract the no forward index info
+   * from fieldConfigList in TableConfig via the properties bag.
+   * @param tableConfig table config
+   */
+  private void extractForwardIndexDisabledColumnsFromTableConfig(TableConfig tableConfig) {
+    List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
+    if (fieldConfigList != null) {
+      for (FieldConfig fieldConfig : fieldConfigList) {
+        Map<String, String> fieldConfigProperties = fieldConfig.getProperties();
+        if (fieldConfigProperties != null) {
+          boolean forwardIndexDisabled = Boolean.parseBoolean(fieldConfigProperties
+              .getOrDefault(FieldConfig.FORWARD_INDEX_DISABLED, FieldConfig.DEFAULT_FORWARD_INDEX_DISABLED));
+          if (forwardIndexDisabled) {
+            _forwardIndexDisabledColumns.add(fieldConfig.getName());
+          }
+        }
+      }
+    }
+  }
+
   /**
    * For tests only.
    */
@@ -350,6 +376,19 @@ public class IndexLoadingConfig {
     return _sortedColumns;
   }
 
+  /**
+   * For tests only.
+   */
+  @VisibleForTesting
+  public void setSortedColumn(String sortedColumn) {
+    if (sortedColumn != null) {
+      _sortedColumns = new ArrayList<>();
+      _sortedColumns.add(sortedColumn);
+    } else {
+      _sortedColumns = Collections.emptyList();
+    }
+  }
+
   public Set<String> getInvertedIndexColumns() {
     return _invertedIndexColumns;
   }
@@ -480,6 +519,14 @@ public class IndexLoadingConfig {
     _onHeapDictionaryColumns = onHeapDictionaryColumns;
   }
 
+  /**
+   * For tests only.
+   */
+  @VisibleForTesting
+  public void setForwardIndexDisabledColumns(Set<String> forwardIndexDisabledColumns) {
+    _forwardIndexDisabledColumns = forwardIndexDisabledColumns;
+  }
+
   public Set<String> getNoDictionaryColumns() {
     return _noDictionaryColumns;
   }
@@ -496,7 +543,7 @@ public class IndexLoadingConfig {
     return _compressionConfigs;
   }
 
-  public Map<String, String> getnoDictionaryConfig() {
+  public Map<String, String> getNoDictionaryConfig() {
     return _noDictionaryConfig;
   }
 
@@ -508,6 +555,10 @@ public class IndexLoadingConfig {
     return _onHeapDictionaryColumns;
   }
 
+  public Set<String> getForwardIndexDisabledColumns() {
+    return _forwardIndexDisabledColumns;
+  }
+
   public Map<String, BloomFilterConfig> getBloomFilterConfigs() {
     return _bloomFilterConfigs;
   }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index ed09a6f299..db234a817a 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -42,6 +42,7 @@ import org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCrea
 import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueUnsortedForwardIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueSortedForwardIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.fwd.SingleValueUnsortedForwardIndexCreator;
+import org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator;
 import org.apache.pinot.segment.local.segment.creator.impl.stats.BytesColumnPredIndexStatsCollector;
 import org.apache.pinot.segment.local.segment.creator.impl.stats.DoubleColumnPreIndexStatsCollector;
 import org.apache.pinot.segment.local.segment.creator.impl.stats.FloatColumnPreIndexStatsCollector;
@@ -224,6 +225,22 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
       if (columnMetadata != null) {
         // Column exists in the segment, check if we need to update the value.
 
+        if (_segmentWriter != null && !columnMetadata.isAutoGenerated()) {
+          // Check that forward index disabled isn't enabled / disabled on an existing column (not auto-generated).
+          // TODO: Add support for reloading segments when forward index disabled flag is enabled or disabled
+          boolean forwardIndexDisabled = !_segmentWriter.hasIndexFor(column, ColumnIndexType.FORWARD_INDEX);
+          if (forwardIndexDisabled != _indexLoadingConfig.getForwardIndexDisabledColumns()
+              .contains(column)) {
+            String failureMessage =
+                "Forward index disabled in segment: " + forwardIndexDisabled + " for column: " + column
+                    + " does not match forward index disabled flag: "
+                    + _indexLoadingConfig.getForwardIndexDisabledColumns().contains(column) + " in the TableConfig, "
+                    + "setting this flag on new columns or updating this flag is not supported at the moment. Please "
+                    + "backfill or refresh segments to use this feature.";
+            throw new RuntimeException(failureMessage);
+          }
+        }
+
         // Only check for auto-generated column.
         if (!columnMetadata.isAutoGenerated()) {
           continue;
@@ -363,6 +380,13 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
                   argument);
               return false;
             }
+            // TODO: Support creation of derived columns from forward index disabled columns
+            if (!_segmentWriter.hasIndexFor(argument, ColumnIndexType.FORWARD_INDEX)) {
+              throw new UnsupportedOperationException(String.format("Operation not supported! Cannot create a derived "
+                  + "column %s because argument: %s does not have a forward index. Enable forward index and "
+                  + "refresh/backfill the segments to create a derived column from source column %s", column, argument,
+                  argument));
+            }
             argumentsMetadata.add(columnMetadata);
           }
 
@@ -372,6 +396,12 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
             return false;
           }
 
+          // TODO: Support forward index disabled derived column
+          if (_indexLoadingConfig.getForwardIndexDisabledColumns().contains(column)) {
+            LOGGER.warn("Skip creating forward index disabled derived column: {}", column);
+            return false;
+          }
+
           try {
             createDerivedColumnV1Indices(column, functionEvaluator, argumentsMetadata);
             return true;
@@ -388,8 +418,46 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
     return true;
   }
 
+  /**
+   * Validates the compatibility of the indexes if the column has the forward index disabled. Throws exceptions due to
+   * compatibility mismatch. The checks performed are:
+   *     - Validate dictionary is enabled.
+   *     - Validate inverted index is enabled.
+   *     - Validate that either no range index exists for column or the range index version is at least 2 and isn't a
+   *       multi-value column (since multi-value defaults to index v1).
+   */
+  protected void validateForwardIndexDisabledConfigsIfPresent(String column, boolean forwardIndexDisabled) {
+    if (!forwardIndexDisabled) {
+      return;
+    }
+    LOGGER.warn("Disabling forward index on a new column {} is currently not supported. Treating this as a no-op!",
+        column);
+    FieldSpec fieldSpec = _schema.getFieldSpecFor(column);
+    Preconditions.checkState(_indexLoadingConfig.getInvertedIndexColumns().contains(column),
+          String.format("Inverted index must be enabled for forward index disabled column: %s", column));
+      Preconditions.checkState(!_indexLoadingConfig.getNoDictionaryColumns().contains(column),
+          String.format("Dictionary disabled column: %s cannot disable the forward index", column));
+      if (_indexLoadingConfig.getRangeIndexColumns() != null
+          && _indexLoadingConfig.getRangeIndexColumns().contains(column)) {
+        Preconditions.checkState(fieldSpec.isSingleValueField(),
+            String.format("Multi-value column with range index: %s cannot disable the forward index", column));
+        Preconditions.checkState(_indexLoadingConfig.getRangeIndexVersion() == BitSlicedRangeIndexCreator.VERSION,
+            String.format("Single-value column with range index version < 2: %s cannot disable the forward index",
+                column));
+      }
+  }
+
+  /**
+   * Check and return whether the forward index is disabled for a given column
+   */
+  protected boolean isForwardIndexDisabled(String column) {
+    return _indexLoadingConfig.getForwardIndexDisabledColumns() != null
+        && _indexLoadingConfig.getForwardIndexDisabledColumns().contains(column);
+  }
+
   /**
    * Helper method to create the V1 indices (dictionary and forward index) for a column with default values.
+   * TODO: Add support for handling the forwardIndexDisabled flag. Today this flag is ignored for default columns
    */
   private void createDefaultValueColumnV1Indices(String column)
       throws Exception {
@@ -402,6 +470,12 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
     boolean isSingleValue = fieldSpec.isSingleValueField();
     int maxNumberOfMultiValueElements = isSingleValue ? 0 : 1;
 
+    // Validate that the forwardIndexDisabled flag, if enabled, is compatible with other indexes and configs
+    // For now the forwardIndexDisabled flag is ignored for default columns but will be handled as part of reload
+    // changes
+    boolean forwardIndexDisabled = isForwardIndexDisabled(column);
+    validateForwardIndexDisabledConfigsIfPresent(column, forwardIndexDisabled);
+
     Object sortedArray;
     switch (dataType.getStoredType()) {
       case INT:
@@ -465,7 +539,10 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
         }
       }
     } else {
+      // TODO: Add support to disable the forward index if the forwardIndexDisabled flag is true and the column is a
+      //       multi-value column.
       // Multi-value column.
+
       try (
           MultiValueUnsortedForwardIndexCreator mvFwdIndexCreator = new MultiValueUnsortedForwardIndexCreator(_indexDir,
               fieldSpec.getName(), 1/*cardinality*/, totalDocs/*numDocs*/, totalDocs/*totalNumberOfValues*/)) {
@@ -486,6 +563,7 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
    * TODO:
    *   - Support chained derived column
    *   - Support raw derived column
+   *   - Support forward index disabled derived column
    */
   private void createDerivedColumnV1Indices(String column, FunctionEvaluator functionEvaluator,
       List<ColumnMetadata> argumentsMetadata)
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitMVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitMVForwardIndexReader.java
index b783dc1f0f..7a6f153768 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitMVForwardIndexReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitMVForwardIndexReader.java
@@ -172,6 +172,7 @@ public final class FixedBitMVForwardIndexReader implements ForwardIndexReader<Fi
     return dictIdBuffer;
   }
 
+  @Override
   public int getNumValuesMV(int docId, Context context) {
     int contextDocId = context._docId;
     int contextEndOffset = context._endOffset;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
index 87f2149d35..a510bfd128 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
@@ -42,6 +42,7 @@ public class PinotSegmentColumnReader implements Closeable {
     DataSource dataSource = indexSegment.getDataSource(column);
     Preconditions.checkArgument(dataSource != null, "Failed to find data source for column: %s", column);
     _forwardIndexReader = dataSource.getForwardIndex();
+    Preconditions.checkArgument(_forwardIndexReader != null, "Forward index disabled for column: %s", column);
     _forwardIndexReaderContext = _forwardIndexReader.createContext();
     _dictionary = dataSource.getDictionary();
     _nullValueVectorReader = dataSource.getNullValueVector();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java
index c966e8318a..899d736542 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java
@@ -128,6 +128,7 @@ public class StarTreeLoaderUtils {
             throws IOException {
           // NOTE: Close the indexes managed by the star-tree (dictionary is managed inside the ColumnIndexContainer).
           for (DataSource dataSource : dataSourceMap.values()) {
+            // Forward index cannot be null here
             dataSource.getForwardIndex().close();
           }
         }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index ed6e4d9095..2f15d9b2b4 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -42,6 +42,7 @@ import org.apache.pinot.common.tier.TierFactory;
 import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.segment.local.function.FunctionEvaluator;
 import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
+import org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
 import org.apache.pinot.spi.config.table.FieldConfig;
@@ -847,6 +848,7 @@ public final class TableConfigUtils {
    * Validates the Field Config List in the given TableConfig
    * Ensures that every referred column name exists in the corresponding schema
    * Additional checks for TEXT and FST index types
+   * Validates index compatibility for forward index disabled columns
    */
   private static void validateFieldConfigList(@Nullable List<FieldConfig> fieldConfigList,
       @Nullable IndexingConfig indexingConfigs, @Nullable Schema schema) {
@@ -874,6 +876,10 @@ public final class TableConfigUtils {
           default:
             break;
         }
+
+        // Validate the forward index disabled compatibility with other indexes if enabled for this column
+        validateForwardIndexDisabledIndexCompatibility(columnName, fieldConfig, indexingConfigs, noDictionaryColumns,
+            schema);
       }
 
       if (CollectionUtils.isNotEmpty(fieldConfig.getIndexTypes())) {
@@ -902,6 +908,44 @@ public final class TableConfigUtils {
     }
   }
 
+  /**
+   * Validates the compatibility of the indexes if the column has the forward index disabled. Throws exceptions due to
+   * compatibility mismatch. The checks performed are:
+   *     - Validate dictionary is enabled.
+   *     - Validate inverted index is enabled.
+   *     - Validate that either no range index exists for column or the range index version is at least 2 and isn't a
+   *       multi-value column (since mulit-value defaults to index v1).
+   */
+  private static void validateForwardIndexDisabledIndexCompatibility(String columnName, FieldConfig fieldConfig,
+      IndexingConfig indexingConfigs, List<String> noDictionaryColumns, Schema schema) {
+    Map<String, String> fieldConfigProperties = fieldConfig.getProperties();
+    if (fieldConfigProperties == null) {
+      return;
+    }
+
+    boolean forwardIndexDisabled = Boolean.parseBoolean(fieldConfigProperties.get(FieldConfig.FORWARD_INDEX_DISABLED));
+    if (!forwardIndexDisabled) {
+      return;
+    }
+
+    FieldSpec fieldSpec = schema.getFieldSpecFor(columnName);
+    Preconditions.checkState(fieldConfig.getEncodingType() == FieldConfig.EncodingType.DICTIONARY
+            || noDictionaryColumns == null || !noDictionaryColumns.contains(columnName),
+        String.format("Forward index disabled column %s must have dictionary enabled", columnName));
+    Preconditions.checkState(indexingConfigs.getInvertedIndexColumns() != null
+            && indexingConfigs.getInvertedIndexColumns().contains(columnName),
+        String.format("Forward index disabled column %s must have inverted index enabled", columnName));
+    if (indexingConfigs.getRangeIndexColumns() != null && indexingConfigs.getRangeIndexColumns().contains(columnName)) {
+      Preconditions.checkState(fieldSpec.isSingleValueField(), String.format("Feature not supported for multi-value "
+          + "columns with range index. Cannot disable forward index for column %s. Disable range index on this "
+          + "column to use this feature", columnName));
+      Preconditions.checkState(indexingConfigs.getRangeIndexVersion() == BitSlicedRangeIndexCreator.VERSION,
+          String.format("Feature not supported for single-value columns with range index version < 2. Cannot disable "
+              + "forward index for column %s. Either disable range index or create range index with"
+              + " version >= 2 to use this feature", columnName));
+    }
+  }
+
   private static void sanitize(TableConfig tableConfig) {
     tableConfig.setIndexingConfig(sanitizeIndexingConfig(tableConfig.getIndexingConfig()));
   }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java
index cc00e6ad97..0c09383679 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/LoaderTest.java
@@ -20,11 +20,13 @@ package org.apache.pinot.segment.local.segment.index.loader;
 
 import java.io.File;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
@@ -33,6 +35,7 @@ import org.apache.pinot.segment.local.segment.creator.impl.SegmentCreationDriver
 import org.apache.pinot.segment.local.segment.index.converter.SegmentV1V2ToV3FormatConverter;
 import org.apache.pinot.segment.local.segment.index.readers.StringDictionary;
 import org.apache.pinot.segment.spi.ColumnMetadata;
+import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
@@ -62,6 +65,7 @@ import org.testng.annotations.Test;
 import org.testng.collections.Lists;
 
 import static org.apache.pinot.segment.spi.V1Constants.Indexes.FST_INDEX_FILE_EXTENSION;
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
@@ -75,6 +79,7 @@ public class LoaderTest {
 
   private static final String TEXT_INDEX_COL_NAME = "column5";
   private static final String FST_INDEX_COL_NAME = "column5";
+  private static final String NO_FORWARD_INDEX_COL_NAME = "column4";
 
   private File _avroFile;
   private File _indexDir;
@@ -457,6 +462,160 @@ public class LoaderTest {
     indexSegment.destroy();
   }
 
+  private void constructSegmentWithForwardIndexDisabled(SegmentVersion segmentVersion, boolean enableInvertedIndex)
+      throws Exception {
+    FileUtils.deleteQuietly(INDEX_DIR);
+    SegmentGeneratorConfig segmentGeneratorConfig =
+        SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(_avroFile, INDEX_DIR, "testTable");
+    SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
+    List<String> forwardIndexDisabledColumns = new ArrayList<>();
+    forwardIndexDisabledColumns.add(NO_FORWARD_INDEX_COL_NAME);
+    segmentGeneratorConfig.setForwardIndexDisabledColumns(forwardIndexDisabledColumns);
+    if (enableInvertedIndex) {
+      segmentGeneratorConfig.createInvertedIndexForColumn(NO_FORWARD_INDEX_COL_NAME);
+    }
+    segmentGeneratorConfig.setSegmentVersion(segmentVersion);
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+
+    _indexDir = new File(INDEX_DIR, driver.getSegmentName());
+  }
+
+  @Test
+  public void testForwardIndexDisabledLoad()
+      throws Exception {
+    // Tests for scenarios by creating on-disk segment in V3 and then loading
+    // the segment with and without specifying segmentVersion in IndexLoadingConfig
+
+    // create on-disk segment in V3
+    // this generates the segment in V1 but converts to V3 as part of post-creation processing
+    constructSegmentWithForwardIndexDisabled(SegmentVersion.v3, true);
+
+    // check that segment on-disk version is V3 after creation
+    Assert.assertEquals(new SegmentMetadataImpl(_indexDir).getVersion(), SegmentVersion.v3);
+    // check that V3 index sub-dir exists
+    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
+    // check that index dir is not in V1 format (the only subdir it should have is V3)
+    verifyIndexDirIsV3(_indexDir);
+
+    // CASE 1: don't set the segment version to load in IndexLoadingConfig
+    // there should be no conversion done by ImmutableSegmentLoader and it should
+    // be able to create all index readers with on-disk version V3
+    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
+
+    Set<String> forwardIndexDisabledColumns = new HashSet<>();
+    forwardIndexDisabledColumns.add(NO_FORWARD_INDEX_COL_NAME);
+    indexLoadingConfig.setForwardIndexDisabledColumns(forwardIndexDisabledColumns);
+    indexLoadingConfig.setReadMode(ReadMode.mmap);
+    ImmutableSegment indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
+    // check that loaded segment version is v3
+    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v3);
+    Assert.assertNull(indexSegment.getForwardIndex(NO_FORWARD_INDEX_COL_NAME));
+    Assert.assertTrue(indexSegment.getSegmentMetadata().getColumnMetadataFor(NO_FORWARD_INDEX_COL_NAME)
+        .hasDictionary());
+    // no change/conversion should have happened in indexDir
+    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
+    // check that index dir is not in V1 format (the only subdir it should have is V3)
+    verifyIndexDirIsV3(_indexDir);
+    indexSegment.destroy();
+
+    // CASE 2: set the segment version to load in IndexLoadingConfig as V3
+    // there should be no conversion done by ImmutableSegmentLoader since the segmentVersionToLoad
+    // is same as the version of segment on disk (V3)
+    indexLoadingConfig.setSegmentVersion(SegmentVersion.v3);
+    indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
+    // check that loaded segment version is v3
+    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v3);
+    Assert.assertNull(indexSegment.getForwardIndex(NO_FORWARD_INDEX_COL_NAME));
+    Assert.assertTrue(indexSegment.getSegmentMetadata().getColumnMetadataFor(NO_FORWARD_INDEX_COL_NAME)
+        .hasDictionary());
+    // no change/conversion should have happened in indexDir
+    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
+    // check that index dir is not in V1 format (the only subdir it should have is V3)
+    verifyIndexDirIsV3(_indexDir);
+    indexSegment.destroy();
+
+    // Test for scenarios by creating on-disk segment in V1 and then loading
+    // the segment with and without specifying segmentVersion in IndexLoadingConfig
+
+    // create on-disk segment in V1
+    // this generates the segment in V1 and does not convert to V3 as part of post-creation processing
+    constructSegmentWithForwardIndexDisabled(SegmentVersion.v1, true);
+
+    // check that segment on-disk version is V1 after creation
+    Assert.assertEquals(new SegmentMetadataImpl(_indexDir).getVersion(), SegmentVersion.v1);
+    // check that segment v1 dir exists
+    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v1).exists());
+    // check that v3 index sub-dir does not exist
+    Assert.assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
+
+    // CASE 1: don't set the segment version to load in IndexLoadingConfig
+    // there should be no conversion done by ImmutableSegmentLoader and it should
+    // be able to create all index readers with on-disk version V1
+    indexLoadingConfig = new IndexLoadingConfig();
+    forwardIndexDisabledColumns = new HashSet<>();
+    forwardIndexDisabledColumns.add(NO_FORWARD_INDEX_COL_NAME);
+    indexLoadingConfig.setForwardIndexDisabledColumns(forwardIndexDisabledColumns);
+    indexLoadingConfig.setReadMode(ReadMode.mmap);
+    indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
+    // check that loaded segment version is v1
+    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v1);
+    Assert.assertNull(indexSegment.getForwardIndex(NO_FORWARD_INDEX_COL_NAME));
+    Assert.assertTrue(indexSegment.getSegmentMetadata().getColumnMetadataFor(NO_FORWARD_INDEX_COL_NAME)
+        .hasDictionary());
+    // no change/conversion should have happened in indexDir
+    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v1).exists());
+    Assert.assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
+    indexSegment.destroy();
+
+    // CASE 2: set the segment version to load in IndexLoadingConfig to V1
+    // there should be no conversion done by ImmutableSegmentLoader since the segmentVersionToLoad
+    // is same as the version of segment on fisk
+    indexLoadingConfig.setSegmentVersion(SegmentVersion.v1);
+    indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
+    // check that loaded segment version is v1
+    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v1);
+    Assert.assertNull(indexSegment.getForwardIndex(NO_FORWARD_INDEX_COL_NAME));
+    Assert.assertTrue(indexSegment.getSegmentMetadata().getColumnMetadataFor(NO_FORWARD_INDEX_COL_NAME)
+        .hasDictionary());
+    // no change/conversion should have happened in indexDir
+    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v1).exists());
+    Assert.assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
+    indexSegment.destroy();
+
+    // CASE 3: set the segment version to load in IndexLoadingConfig to V3
+    // there should be conversion done by ImmutableSegmentLoader since the segmentVersionToLoad
+    // is different than the version of segment on disk
+    indexLoadingConfig.setSegmentVersion(SegmentVersion.v3);
+    indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
+    // check that loaded segment version is v3
+    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v3);
+    Assert.assertNull(indexSegment.getForwardIndex(NO_FORWARD_INDEX_COL_NAME));
+    Assert.assertTrue(indexSegment.getSegmentMetadata().getColumnMetadataFor(NO_FORWARD_INDEX_COL_NAME)
+        .hasDictionary());
+    // the index dir should exist in v3 format due to conversion
+    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
+    verifyIndexDirIsV3(_indexDir);
+    indexSegment.destroy();
+
+    // Test scenarios to create a column with no forward index but without enabling inverted index for it
+    try {
+      constructSegmentWithForwardIndexDisabled(SegmentVersion.v3, false);
+      Assert.fail("Disabling forward index without enabling inverted index is not allowed!");
+    } catch (IllegalStateException e) {
+      assertEquals(String.format("Cannot disable forward index for column %s without inverted index enabled",
+          NO_FORWARD_INDEX_COL_NAME), e.getMessage());
+    }
+
+    try {
+      constructSegmentWithForwardIndexDisabled(SegmentVersion.v1, false);
+      Assert.fail("Disabling forward index without enabling inverted index is not allowed!");
+    } catch (IllegalStateException e) {
+      assertEquals(String.format("Cannot disable forward index for column %s without inverted index enabled",
+          NO_FORWARD_INDEX_COL_NAME), e.getMessage());
+    }
+  }
+
   private void constructSegmentWithTextIndex(SegmentVersion segmentVersion)
       throws Exception {
     FileUtils.deleteQuietly(INDEX_DIR);
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
index 2bcad84f23..babbe646bc 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
@@ -70,6 +70,7 @@ import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -100,6 +101,10 @@ public class SegmentPreProcessorTest {
   // For create fst index tests
   private static final String NEWLY_ADDED_FST_COL_DICT = "newFSTColDict";
 
+  // For create no forward index column tests
+  private static final String NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV = "newForwardIndexDisabledColumnSV";
+  private static final String NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV = "newForwardIndexDisabledColumnMV";
+
   // For update default value tests.
   private static final String NEW_COLUMNS_SCHEMA1 = "data/newColumnsSchema1.json";
   private static final String NEW_COLUMNS_SCHEMA2 = "data/newColumnsSchema2.json";
@@ -107,6 +112,8 @@ public class SegmentPreProcessorTest {
   private static final String NEW_COLUMNS_SCHEMA_WITH_FST = "data/newColumnsSchemaWithFST.json";
   private static final String NEW_COLUMNS_SCHEMA_WITH_TEXT = "data/newColumnsSchemaWithText.json";
   private static final String NEW_COLUMNS_SCHEMA_WITH_H3_JSON = "data/newColumnsSchemaWithH3Json.json";
+  private static final String NEW_COLUMNS_SCHEMA_WITH_NO_FORWARD_INDEX =
+      "data/newColumnsSchemaWithForwardIndexDisabled.json";
   private static final String NEW_INT_METRIC_COLUMN_NAME = "newIntMetric";
   private static final String NEW_LONG_METRIC_COLUMN_NAME = "newLongMetric";
   private static final String NEW_FLOAT_METRIC_COLUMN_NAME = "newFloatMetric";
@@ -129,6 +136,7 @@ public class SegmentPreProcessorTest {
   private Schema _newColumnsSchemaWithFST;
   private Schema _newColumnsSchemaWithText;
   private Schema _newColumnsSchemaWithH3Json;
+  private Schema _newColumnsSchemaWithForwardIndexDisabled;
 
   @BeforeMethod
   public void setUp()
@@ -186,6 +194,9 @@ public class SegmentPreProcessorTest {
     resourceUrl = classLoader.getResource(NEW_COLUMNS_SCHEMA_WITH_H3_JSON);
     assertNotNull(resourceUrl);
     _newColumnsSchemaWithH3Json = Schema.fromFile(new File(resourceUrl.getFile()));
+    resourceUrl = classLoader.getResource(NEW_COLUMNS_SCHEMA_WITH_NO_FORWARD_INDEX);
+    assertNotNull(resourceUrl);
+    _newColumnsSchemaWithForwardIndexDisabled = Schema.fromFile(new File(resourceUrl.getFile()));
   }
 
   @AfterMethod
@@ -344,7 +355,7 @@ public class SegmentPreProcessorTest {
     assertNotNull(columnMetadata);
     checkTextIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0);
     validateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, true,
-        0, newCompressionType);
+        0, newCompressionType, false);
 
     // Test4: Change compression on RAW index column. Change another index on another column. Check correctness.
     newCompressionType = ChunkCompressionType.ZSTANDARD;
@@ -361,7 +372,7 @@ public class SegmentPreProcessorTest {
     checkFSTIndexCreation(EXISTING_STRING_COL_DICT, 9, 4, _newColumnsSchemaWithFST, false, false, 26);
     // Check forward index.
     validateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, true,
-        0, newCompressionType);
+        0, newCompressionType, false);
   }
 
   /**
@@ -463,14 +474,14 @@ public class SegmentPreProcessorTest {
       boolean isSorted, int dictionaryElementSize)
       throws Exception {
     createAndValidateIndex(ColumnIndexType.FST_INDEX, column, cardinality, bits, schema, isAutoGenerated, true,
-        isSorted, dictionaryElementSize, true, 0, null);
+        isSorted, dictionaryElementSize, true, 0, null, false);
   }
 
   private void checkTextIndexCreation(String column, int cardinality, int bits, Schema schema, boolean isAutoGenerated,
       boolean hasDictionary, boolean isSorted, int dictionaryElementSize)
       throws Exception {
     createAndValidateIndex(ColumnIndexType.TEXT_INDEX, column, cardinality, bits, schema, isAutoGenerated,
-        hasDictionary, isSorted, dictionaryElementSize, true, 0, null);
+        hasDictionary, isSorted, dictionaryElementSize, true, 0, null, false);
   }
 
   private void checkTextIndexCreation(String column, int cardinality, int bits, Schema schema, boolean isAutoGenerated,
@@ -478,7 +489,7 @@ public class SegmentPreProcessorTest {
       int maxNumberOfMultiValues)
       throws Exception {
     createAndValidateIndex(ColumnIndexType.TEXT_INDEX, column, cardinality, bits, schema, isAutoGenerated,
-        hasDictionary, isSorted, dictionaryElementSize, isSingleValue, maxNumberOfMultiValues, null);
+        hasDictionary, isSorted, dictionaryElementSize, isSingleValue, maxNumberOfMultiValues, null, false);
   }
 
   private void checkForwardIndexCreation(String column, int cardinality, int bits, Schema schema,
@@ -486,12 +497,13 @@ public class SegmentPreProcessorTest {
       ChunkCompressionType expectedCompressionType)
       throws Exception {
     createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, column, cardinality, bits, schema, isAutoGenerated,
-        hasDictionary, isSorted, dictionaryElementSize, true, 0, expectedCompressionType);
+        hasDictionary, isSorted, dictionaryElementSize, true, 0, expectedCompressionType, false);
   }
 
   private void createAndValidateIndex(ColumnIndexType indexType, String column, int cardinality, int bits,
       Schema schema, boolean isAutoGenerated, boolean hasDictionary, boolean isSorted, int dictionaryElementSize,
-      boolean isSingleValued, int maxNumberOfMultiValues, ChunkCompressionType expectedCompressionType)
+      boolean isSingleValued, int maxNumberOfMultiValues, ChunkCompressionType expectedCompressionType,
+      boolean forwardIndexDisabled)
       throws Exception {
 
     try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
@@ -500,13 +512,14 @@ public class SegmentPreProcessorTest {
         SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, schema)) {
       processor.process();
       validateIndex(indexType, column, cardinality, bits, schema, isAutoGenerated, hasDictionary, isSorted,
-          dictionaryElementSize, isSingleValued, maxNumberOfMultiValues, expectedCompressionType);
+          dictionaryElementSize, isSingleValued, maxNumberOfMultiValues, expectedCompressionType, forwardIndexDisabled);
     }
   }
 
   private void validateIndex(ColumnIndexType indexType, String column, int cardinality, int bits, Schema schema,
       boolean isAutoGenerated, boolean hasDictionary, boolean isSorted, int dictionaryElementSize,
-      boolean isSingleValued, int maxNumberOfMultiValues, ChunkCompressionType expectedCompressionType)
+      boolean isSingleValued, int maxNumberOfMultiValues, ChunkCompressionType expectedCompressionType,
+      boolean forwardIndexDisabled)
       throws Exception {
     SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
     ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column);
@@ -525,8 +538,18 @@ public class SegmentPreProcessorTest {
         .load(_indexDir.toURI(),
             new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
         SegmentDirectory.Reader reader = segmentDirectory1.createReader()) {
-      assertTrue(reader.hasIndexFor(column, indexType));
-      assertTrue(reader.hasIndexFor(column, ColumnIndexType.FORWARD_INDEX));
+      if (indexType != ColumnIndexType.FORWARD_INDEX || !forwardIndexDisabled) {
+        assertTrue(reader.hasIndexFor(column, indexType));
+      }
+      if (isSingleValued || !forwardIndexDisabled) {
+        assertTrue(reader.hasIndexFor(column, ColumnIndexType.FORWARD_INDEX));
+        if (isSingleValued && forwardIndexDisabled) {
+          assertFalse(reader.hasIndexFor(column, ColumnIndexType.INVERTED_INDEX));
+        }
+      } else {
+        // Default SV columns will have the default sorted forward index so only assert for MV columns here
+        assertFalse(reader.hasIndexFor(column, ColumnIndexType.FORWARD_INDEX));
+      }
 
       // Check if the raw forward index compressionType is correct.
       if (expectedCompressionType != null) {
@@ -1454,4 +1477,312 @@ public class SegmentPreProcessorTest {
         new HashSet<>(Collections.singletonList("newColumnX"))));
     return testCases;
   }
+
+  /**
+   * Test to check the behavior of the forward index disabled feature when enabled on a new SV column
+   */
+  @Test
+  public void testForwardIndexDisabledOnNewColumnsSV()
+      throws Exception {
+    Set<String> forwardIndexDisabledColumns = new HashSet<>();
+    forwardIndexDisabledColumns.add(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV);
+    _indexLoadingConfig.setForwardIndexDisabledColumns(forwardIndexDisabledColumns);
+    Set<String> invertedIndexColumns = _indexLoadingConfig.getInvertedIndexColumns();
+    invertedIndexColumns.addAll(forwardIndexDisabledColumns);
+    _indexLoadingConfig.setInvertedIndexColumns(invertedIndexColumns);
+
+    // Create a segment in V3, add a new column with no forward index enabled
+    constructV3Segment();
+    SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
+    ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV);
+    // should be null since column does not exist in the schema
+    assertNull(columnMetadata);
+
+    // Forward index is always going to be present for default SV columns with forward index disabled. This is because
+    // such default columns are going to be sorted and the forwardIndexDisabled flag is a no-op for sorted columns
+    createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
+        _newColumnsSchemaWithForwardIndexDisabled, true, true, true, 4, true, 0, null, true);
+
+    // Create a segment in V1, add a column with no forward index enabled
+    constructV1Segment();
+    segmentMetadata = new SegmentMetadataImpl(_indexDir);
+    columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV);
+    // should be null since column does not exist in the schema
+    assertNull(columnMetadata);
+
+    // Forward index is always going to be present for default SV columns with forward index disabled. This is because
+    // such default columns are going to be sorted and the forwardIndexDisabled flag is a no-op for sorted columns
+    createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
+        _newColumnsSchemaWithForwardIndexDisabled, true, true, true, 4, true, 0, null, true);
+
+    // Add the column to the no dictionary column list
+    Set<String> existingNoDictionaryColumns = _indexLoadingConfig.getNoDictionaryColumns();
+    _indexLoadingConfig.setNoDictionaryColumns(forwardIndexDisabledColumns);
+
+    // Create a segment in V3, add a new column with no forward index enabled
+    constructV3Segment();
+    segmentMetadata = new SegmentMetadataImpl(_indexDir);
+    columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV);
+    // should be null since column does not exist in the schema
+    assertNull(columnMetadata);
+
+    try {
+    createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
+        NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1, _newColumnsSchemaWithForwardIndexDisabled, true, true, false,
+        4, false, 1, null, false);
+      Assert.fail("Forward index cannot be disabled for dictionary disabled columns!");
+    } catch (IllegalStateException e) {
+      assertEquals(e.getMessage(), "Dictionary disabled column: newForwardIndexDisabledColumnSV cannot disable the "
+          + "forward index");
+    }
+
+    constructV1Segment();
+    segmentMetadata = new SegmentMetadataImpl(_indexDir);
+    columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV);
+    // should be null since column does not exist in the schema
+    assertNull(columnMetadata);
+
+    try {
+      createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
+          _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, false, 1, null, false);
+      Assert.fail("Forward index cannot be disabled for dictionary disabled columns!");
+    } catch (IllegalStateException e) {
+      assertEquals(e.getMessage(), "Dictionary disabled column: newForwardIndexDisabledColumnSV cannot disable the "
+          + "forward index");
+    }
+
+    // Reset the no dictionary columns
+    _indexLoadingConfig.setNoDictionaryColumns(existingNoDictionaryColumns);
+
+    // Add the column to the sorted list
+    List<String> existingSortedColumns = _indexLoadingConfig.getSortedColumns();
+    _indexLoadingConfig.setSortedColumn(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV);
+
+    // Create a segment in V3, add a new column with no forward index enabled
+    constructV3Segment();
+    segmentMetadata = new SegmentMetadataImpl(_indexDir);
+    columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV);
+    // should be null since column does not exist in the schema
+    assertNull(columnMetadata);
+
+    // Column should be sorted and should be created successfully since for SV columns the forwardIndexDisabled flag
+    // is a no-op
+    createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
+        NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1, _newColumnsSchemaWithForwardIndexDisabled, true, true, true,
+        4, true, 0, null, false);
+
+    constructV1Segment();
+    segmentMetadata = new SegmentMetadataImpl(_indexDir);
+    columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV);
+    // should be null since column does not exist in the schema
+    assertNull(columnMetadata);
+
+    // Column should be sorted and should be created successfully since for SV columns the forwardIndexDisabled flag
+    // is a no-op
+    createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
+        NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1, _newColumnsSchemaWithForwardIndexDisabled, true, true, true,
+        4, true, 0, null, false);
+
+    // Reset the sorted column list
+    _indexLoadingConfig.setSortedColumn(existingSortedColumns.isEmpty() ? null : existingSortedColumns.get(0));
+
+    // Remove the column from the inverted index column list and validate that it fails
+    invertedIndexColumns.removeAll(forwardIndexDisabledColumns);
+    _indexLoadingConfig.setInvertedIndexColumns(invertedIndexColumns);
+
+    // Create a segment in V3, add a new column with no forward index enabled
+    constructV3Segment();
+    segmentMetadata = new SegmentMetadataImpl(_indexDir);
+    columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV);
+    // should be null since column does not exist in the schema
+    assertNull(columnMetadata);
+
+    try {
+      createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1,
+          _newColumnsSchemaWithForwardIndexDisabled, true, true, true, 4, true, 0, null, false);
+      Assert.fail("Forward index cannot be disabled without enabling the inverted index!");
+    } catch (IllegalStateException e) {
+      assertEquals(e.getMessage(), "Inverted index must be enabled for forward index disabled column: "
+          + "newForwardIndexDisabledColumnSV");
+    }
+
+    constructV1Segment();
+    segmentMetadata = new SegmentMetadataImpl(_indexDir);
+    columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV);
+    // should be null since column does not exist in the schema
+    assertNull(columnMetadata);
+
+    try {
+      createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
+          NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1, _newColumnsSchemaWithForwardIndexDisabled, true, true, true,
+          4, true, 0, null, false);
+    } catch (IllegalStateException e) {
+      assertEquals(e.getMessage(), "Inverted index must be enabled for forward index disabled column: "
+          + "newForwardIndexDisabledColumnSV");
+    }
+
+    _indexLoadingConfig.setForwardIndexDisabledColumns(new HashSet<>());
+  }
+
+  /**
+   * Test to check the behavior of the forward index disabled feature when enabled on a new MV column
+   * TODO: Add support for handling the forwardIndexDisabled flag on the reload path then enable and fix this test
+   */
+  @Test(enabled = false)
+  public void testForwardIndexDisabledOnNewColumnsMV()
+      throws Exception {
+    Set<String> forwardIndexDisabledColumns = new HashSet<>();
+    forwardIndexDisabledColumns.add(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV);
+    _indexLoadingConfig.setForwardIndexDisabledColumns(forwardIndexDisabledColumns);
+    Set<String> invertedIndexColumns = _indexLoadingConfig.getInvertedIndexColumns();
+    invertedIndexColumns.addAll(forwardIndexDisabledColumns);
+    _indexLoadingConfig.setInvertedIndexColumns(invertedIndexColumns);
+
+    // Create a segment in V3, add a new column with no forward index enabled
+    constructV3Segment();
+    SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
+    ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV);
+    // should be null since column does not exist in the schema
+    assertNull(columnMetadata);
+
+    // Validate that the forward index doesn't exist and that inverted index does exist
+    createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
+        _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, false, 1, null, true);
+    createAndValidateIndex(ColumnIndexType.INVERTED_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
+        _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, false, 1, null, true);
+
+    // Create a segment in V1, add a column with no forward index enabled
+    constructV1Segment();
+    segmentMetadata = new SegmentMetadataImpl(_indexDir);
+    columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV);
+    // should be null since column does not exist in the schema
+    assertNull(columnMetadata);
+
+    // Validate that the forward index doesn't exist and that inverted index does exist
+    createAndValidateIndex(ColumnIndexType.FORWARD_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
+        _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, false, 1, null, true);
+    createAndValidateIndex(ColumnIndexType.INVERTED_INDEX, NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1,
+        _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, false, 1, null, true);
+
+    // Add the column to the no dictionary column list
+    Set<String> existingNoDictionaryColumns = _indexLoadingConfig.getNoDictionaryColumns();
+    _indexLoadingConfig.setNoDictionaryColumns(forwardIndexDisabledColumns);
+
+    // Create a segment in V3, add a new column with no forward index enabled
+    constructV3Segment();
+    segmentMetadata = new SegmentMetadataImpl(_indexDir);
+    columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV);
+    // should be null since column does not exist in the schema
+    assertNull(columnMetadata);
+
+    assertThrows(IllegalStateException.class, () -> createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
+        NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1, _newColumnsSchemaWithForwardIndexDisabled, true, true, false,
+        4, false, 1, null, false));
+
+    constructV1Segment();
+    segmentMetadata = new SegmentMetadataImpl(_indexDir);
+    columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV);
+    // should be null since column does not exist in the schema
+    assertNull(columnMetadata);
+
+    assertThrows(IllegalStateException.class, () -> createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
+        NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1, _newColumnsSchemaWithForwardIndexDisabled, true, true, false,
+        4, false, 1, null, false));
+
+    // Reset the no dictionary columns
+    _indexLoadingConfig.setNoDictionaryColumns(existingNoDictionaryColumns);
+
+    // Remove the column from the inverted index column list and validate that it fails
+    invertedIndexColumns.removeAll(forwardIndexDisabledColumns);
+    _indexLoadingConfig.setInvertedIndexColumns(invertedIndexColumns);
+
+    // Create a segment in V3, add a new column with no forward index enabled
+    constructV3Segment();
+    segmentMetadata = new SegmentMetadataImpl(_indexDir);
+    columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV);
+    // should be null since column does not exist in the schema
+    assertNull(columnMetadata);
+
+    assertThrows(IllegalStateException.class, () -> createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
+        NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1, _newColumnsSchemaWithForwardIndexDisabled, true, true, false,
+        4, false, 1, null, false));
+
+    constructV1Segment();
+    segmentMetadata = new SegmentMetadataImpl(_indexDir);
+    columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV);
+    // should be null since column does not exist in the schema
+    assertNull(columnMetadata);
+
+    assertThrows(IllegalStateException.class, () -> createAndValidateIndex(ColumnIndexType.FORWARD_INDEX,
+        NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV, 1, 1, _newColumnsSchemaWithForwardIndexDisabled, true, true, false,
+        4, false, 1, null, false));
+
+    _indexLoadingConfig.setForwardIndexDisabledColumns(new HashSet<>());
+  }
+
+  /**
+   * Test to check the behavior of the forward index disabled feature
+   * when enabled on an existing dictionary encoded column
+   */
+  @Test
+  public void testForwardIndexDisabledOnExistingColumnDictEncoded()
+      throws Exception {
+    Set<String> forwardIndexDisabledColumns = new HashSet<>();
+    forwardIndexDisabledColumns.add(EXISTING_STRING_COL_DICT);
+    _indexLoadingConfig.setForwardIndexDisabledColumns(forwardIndexDisabledColumns);
+    Set<String> invertedIndexColumns = _indexLoadingConfig.getInvertedIndexColumns();
+    invertedIndexColumns.addAll(forwardIndexDisabledColumns);
+
+    constructV3Segment();
+    SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
+    ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(EXISTING_STRING_COL_DICT);
+    assertNotNull(columnMetadata);
+
+    SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
+        .load(_indexDir.toURI(),
+            new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
+    SegmentPreProcessor v3Processor =
+        new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, _newColumnsSchemaWithForwardIndexDisabled);
+    expectThrows(RuntimeException.class, v3Processor::process);
+
+    constructV1Segment();
+    segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader().load(_indexDir.toURI(),
+        new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
+    SegmentPreProcessor v1Processor =
+        new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, _newColumnsSchemaWithForwardIndexDisabled);
+    expectThrows(RuntimeException.class, v1Processor::process);
+  }
+
+  /**
+   * Test to check the behavior of the forward index disabled feature
+   * when enabled on an existing raw column
+   */
+  @Test
+  public void testForwardIndexDisabledOnExistingColumnRaw()
+      throws Exception {
+    Set<String> forwardIndexDisabledColumns = new HashSet<>();
+    forwardIndexDisabledColumns.add(EXISTING_STRING_COL_RAW);
+    _indexLoadingConfig.setForwardIndexDisabledColumns(forwardIndexDisabledColumns);
+    Set<String> invertedIndexColumns = _indexLoadingConfig.getInvertedIndexColumns();
+    invertedIndexColumns.addAll(forwardIndexDisabledColumns);
+
+    constructV3Segment();
+    SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
+    ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(EXISTING_STRING_COL_RAW);
+    assertNotNull(columnMetadata);
+
+    SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
+        .load(_indexDir.toURI(),
+            new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
+    SegmentPreProcessor v3Processor =
+        new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, _newColumnsSchemaWithForwardIndexDisabled);
+    expectThrows(RuntimeException.class, v3Processor::process);
+
+    constructV1Segment();
+    segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader().load(_indexDir.toURI(),
+        new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build());
+    SegmentPreProcessor v1Processor =
+        new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, _newColumnsSchemaWithForwardIndexDisabled);
+    expectThrows(RuntimeException.class, v1Processor::process);
+  }
 }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandlerTest.java
index beee1f8b73..47ea357894 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandlerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandlerTest.java
@@ -70,7 +70,7 @@ public class BaseDefaultColumnHandlerTest {
     driver.build();
     _segmentDirectory = new File(_indexDir, driver.getSegmentName());
     _committedSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory);
-    _writer = new SegmentLocalFSDirectory(_indexDir, _committedSegmentMetadata, ReadMode.mmap).createWriter();
+    _writer = new SegmentLocalFSDirectory(_segmentDirectory, _committedSegmentMetadata, ReadMode.mmap).createWriter();
   }
 
   @AfterMethod
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 0ebc6b9239..a563e9be21 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -895,6 +895,100 @@ public class TableConfigUtilsTest {
     } catch (Exception e) {
       Assert.assertEquals(e.getMessage(), "Set compression codec to null for dictionary encoding type");
     }
+
+    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        .setNoDictionaryColumns(Arrays.asList("myCol1")).build();
+    try {
+      // Enable forward index disabled flag for a raw column
+      Map<String, String> fieldConfigProperties = new HashMap<>();
+      fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
+      FieldConfig fieldConfig = new FieldConfig("myCol1", FieldConfig.EncodingType.RAW, null, null, null, null,
+          fieldConfigProperties);
+      tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
+      TableConfigUtils.validate(tableConfig, schema);
+      Assert.fail("Should fail for myCol1 without dictionary and with forward index disabled");
+    } catch (Exception e) {
+      Assert.assertEquals(e.getMessage(), "Forward index disabled column myCol1 must have dictionary enabled");
+    }
+
+    try {
+      // Enable forward index disabled flag for a column without inverted index
+      Map<String, String> fieldConfigProperties = new HashMap<>();
+      fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
+      FieldConfig fieldConfig = new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY, null, null, null, null,
+          fieldConfigProperties);
+      tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
+      TableConfigUtils.validate(tableConfig, schema);
+      Assert.fail("Should fail for conflicting myCol2 with forward index disabled but no inverted index");
+    } catch (Exception e) {
+      Assert.assertEquals(e.getMessage(), "Forward index disabled column myCol2 must have inverted index enabled");
+    }
+
+    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        .setNoDictionaryColumns(Arrays.asList("myCol1")).setInvertedIndexColumns(Arrays.asList("myCol2")).build();
+    try {
+      // Enable forward index disabled flag for a column with inverted index
+      Map<String, String> fieldConfigProperties = new HashMap<>();
+      fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
+      FieldConfig fieldConfig = new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY,
+          FieldConfig.IndexType.INVERTED, null, null, null, fieldConfigProperties);
+      tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
+      TableConfigUtils.validate(tableConfig, schema);
+    } catch (Exception e) {
+      Assert.fail("Should not fail as myCol2 has forward index disabled but inverted index enabled");
+    }
+
+    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        .setNoDictionaryColumns(Arrays.asList("myCol1")).setInvertedIndexColumns(Arrays.asList("myCol2"))
+        .setSortedColumn("myCol2").build();
+    try {
+      // Enable forward index disabled flag for a column with inverted index and is sorted
+      Map<String, String> fieldConfigProperties = new HashMap<>();
+      fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
+      FieldConfig fieldConfig = new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY,
+          FieldConfig.IndexType.INVERTED, null, null, null, fieldConfigProperties);
+      tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
+      TableConfigUtils.validate(tableConfig, schema);
+    } catch (Exception e) {
+      Assert.fail("Should not fail for myCol2 with forward index disabled but is sorted, this is a no-op");
+    }
+
+    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        .setNoDictionaryColumns(Arrays.asList("myCol1")).setInvertedIndexColumns(Arrays.asList("myCol2"))
+        .setRangeIndexColumns(Arrays.asList("myCol2")).build();
+    try {
+      // Enable forward index disabled flag for a multi-value column with inverted index and range index
+      Map<String, String> fieldConfigProperties = new HashMap<>();
+      fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
+      FieldConfig fieldConfig = new FieldConfig("myCol2", FieldConfig.EncodingType.DICTIONARY,
+          FieldConfig.IndexType.INVERTED, Arrays.asList(FieldConfig.IndexType.INVERTED, FieldConfig.IndexType.RANGE),
+          null, null, fieldConfigProperties);
+      tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
+      TableConfigUtils.validate(tableConfig, schema);
+      Assert.fail("Should fail for MV myCol2 with forward index disabled but has range and inverted index");
+    } catch (Exception e) {
+      Assert.assertEquals(e.getMessage(), "Feature not supported for multi-value columns with range index. "
+          + "Cannot disable forward index for column myCol2. Disable range index on this column to use this feature");
+    }
+
+    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        .setInvertedIndexColumns(Arrays.asList("myCol1")).setRangeIndexColumns(Arrays.asList("myCol1")).build();
+    try {
+      // Enable forward index disabled flag for a singe-value column with inverted index and range index v1
+      Map<String, String> fieldConfigProperties = new HashMap<>();
+      fieldConfigProperties.put(FieldConfig.FORWARD_INDEX_DISABLED, Boolean.TRUE.toString());
+      FieldConfig fieldConfig = new FieldConfig("myCol1", FieldConfig.EncodingType.DICTIONARY,
+          FieldConfig.IndexType.INVERTED, Arrays.asList(FieldConfig.IndexType.INVERTED, FieldConfig.IndexType.RANGE),
+          null, null, fieldConfigProperties);
+      tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
+      tableConfig.getIndexingConfig().setRangeIndexVersion(1);
+      TableConfigUtils.validate(tableConfig, schema);
+      Assert.fail("Should fail for SV myCol1 with forward index disabled but has range v1 and inverted index");
+    } catch (Exception e) {
+      Assert.assertEquals(e.getMessage(), "Feature not supported for single-value columns with range index version "
+          + "< 2. Cannot disable forward index for column myCol1. Either disable range index or create range index "
+          + "with version >= 2 to use this feature");
+    }
   }
 
   @Test
diff --git a/pinot-segment-local/src/test/resources/data/newColumnsSchemaWithForwardIndexDisabled.json b/pinot-segment-local/src/test/resources/data/newColumnsSchemaWithForwardIndexDisabled.json
new file mode 100644
index 0000000000..83ac30f529
--- /dev/null
+++ b/pinot-segment-local/src/test/resources/data/newColumnsSchemaWithForwardIndexDisabled.json
@@ -0,0 +1,77 @@
+{
+  "schemaName": "testDataMV",
+  "dimensionFieldSpecs": [
+    {
+      "name": "column1",
+      "dataType": "INT"
+    },
+    {
+      "name": "column2",
+      "dataType": "INT"
+    },
+    {
+      "name": "column3",
+      "dataType": "STRING"
+    },
+    {
+      "name": "column4",
+      "dataType": "STRING"
+    },
+    {
+      "name": "column5",
+      "dataType": "STRING"
+    },
+    {
+      "name": "newForwardIndexDisabledColumnSV",
+      "dataType": "STRING"
+    },
+    {
+      "name": "newForwardIndexDisabledColumnMV",
+      "dataType": "STRING",
+      "singleValueField": false
+    },
+    {
+      "name": "column6",
+      "dataType": "INT",
+      "singleValueField": false
+    },
+    {
+      "name": "column7",
+      "dataType": "INT",
+      "singleValueField": false
+    },
+    {
+      "name": "column8",
+      "dataType": "INT"
+    },
+    {
+      "name": "column9",
+      "dataType": "INT"
+    },
+    {
+      "name": "column10",
+      "dataType": "INT"
+    },
+    {
+      "name": "column13",
+      "dataType": "INT"
+    },
+    {
+      "name": "weeksSinceEpochSunday",
+      "dataType": "INT"
+    }
+  ],
+  "metricFieldSpecs": [
+    {
+      "name": "count",
+      "dataType": "INT"
+    }
+  ],
+  "timeFieldSpec": {
+    "incomingGranularitySpec": {
+      "timeType": "DAYS",
+      "dataType": "INT",
+      "name": "daysSinceEpoch"
+    }
+  }
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
index 732ef48cec..b2e2d75c98 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreationContext.java
@@ -64,6 +64,8 @@ public interface IndexCreationContext {
 
   Comparable<?> getMaxValue();
 
+  boolean forwardIndexDisabled();
+
   final class Builder {
     private File _indexDir;
     private int _lengthOfLongestEntry;
@@ -78,6 +80,7 @@ public interface IndexCreationContext {
     private boolean _hasDictionary = true;
     private Comparable<?> _minValue;
     private Comparable<?> _maxValue;
+    private boolean _forwardIndexDisabled;
 
     public Builder withColumnIndexCreationInfo(ColumnIndexCreationInfo columnIndexCreationInfo) {
       return withLengthOfLongestEntry(columnIndexCreationInfo.getLengthOfLongestEntry())
@@ -161,10 +164,15 @@ public interface IndexCreationContext {
       return this;
     }
 
+    public Builder withforwardIndexDisabled(boolean forwardIndexDisabled) {
+      _forwardIndexDisabled = forwardIndexDisabled;
+      return this;
+    }
+
     public Common build() {
       return new Common(Objects.requireNonNull(_indexDir), _lengthOfLongestEntry, _maxNumberOfMultiValueElements,
           _maxRowLengthInBytes, _onHeap, Objects.requireNonNull(_fieldSpec), _sorted, _cardinality,
-          _totalNumberOfEntries, _totalDocs, _hasDictionary, _minValue, _maxValue);
+          _totalNumberOfEntries, _totalDocs, _hasDictionary, _minValue, _maxValue, _forwardIndexDisabled);
     }
   }
 
@@ -187,11 +195,13 @@ public interface IndexCreationContext {
     private final boolean _hasDictionary;
     private final Comparable<?> _minValue;
     private final Comparable<?> _maxValue;
+    private final boolean _forwardIndexDisabled;
 
     public Common(File indexDir, int lengthOfLongestEntry,
         int maxNumberOfMultiValueElements, int maxRowLengthInBytes, boolean onHeap,
         FieldSpec fieldSpec, boolean sorted, int cardinality, int totalNumberOfEntries,
-        int totalDocs, boolean hasDictionary, Comparable<?> minValue, Comparable<?> maxValue) {
+        int totalDocs, boolean hasDictionary, Comparable<?> minValue, Comparable<?> maxValue,
+        boolean forwardIndexDisabled) {
       _indexDir = indexDir;
       _lengthOfLongestEntry = lengthOfLongestEntry;
       _maxNumberOfMultiValueElements = maxNumberOfMultiValueElements;
@@ -205,6 +215,7 @@ public interface IndexCreationContext {
       _hasDictionary = hasDictionary;
       _minValue = minValue;
       _maxValue = maxValue;
+      _forwardIndexDisabled = forwardIndexDisabled;
     }
 
     public FieldSpec getFieldSpec() {
@@ -261,6 +272,11 @@ public interface IndexCreationContext {
       return _maxValue;
     }
 
+    @Override
+    public boolean forwardIndexDisabled() {
+      return _forwardIndexDisabled;
+    }
+
     public BloomFilter forBloomFilter(BloomFilterConfig bloomFilterConfig) {
       return new BloomFilter(this, bloomFilterConfig);
     }
@@ -367,6 +383,11 @@ public interface IndexCreationContext {
     public Comparable getMaxValue() {
       return _delegate.getMaxValue();
     }
+
+    @Override
+    public boolean forwardIndexDisabled() {
+      return _delegate.forwardIndexDisabled();
+    }
   }
 
   class BloomFilter extends Wrapper {
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
index b5539b8b17..da5fbc9dda 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/SegmentGeneratorConfig.java
@@ -82,6 +82,7 @@ public class SegmentGeneratorConfig implements Serializable {
   private final List<String> _textIndexCreationColumns = new ArrayList<>();
   private final List<String> _fstIndexCreationColumns = new ArrayList<>();
   private final Map<String, JsonIndexConfig> _jsonIndexConfigs = new HashMap<>();
+  private final List<String> _forwardIndexDisabledColumns = new ArrayList<>();
   private final Map<String, H3IndexConfig> _h3IndexConfigs = new HashMap<>();
   private final Map<String, List<TimestampIndexGranularity>> _timestampIndexConfigs = new HashMap<>();
   private final List<String> _columnSortOrder = new ArrayList<>();
@@ -232,6 +233,7 @@ public class SegmentGeneratorConfig implements Serializable {
       extractFSTIndexColumnsFromTableConfig(tableConfig);
       extractH3IndexConfigsFromTableConfig(tableConfig);
       extractCompressionCodecConfigsFromTableConfig(tableConfig);
+      extractForwardIndexDisabledColumnsFromTableConfig(tableConfig);
 
       _fstTypeForFSTIndex = indexingConfig.getFSTIndexType();
       _nullHandlingEnabled = indexingConfig.isNullHandlingEnabled();
@@ -360,6 +362,30 @@ public class SegmentGeneratorConfig implements Serializable {
     }
   }
 
+  /**
+   * Forward index disabled info for each column is specified
+   * using {@link FieldConfig} model of indicating per column
+   * encoding and indexing information. Since SegmentGeneratorConfig
+   * is created from TableConfig, we extract the forward index disabled info
+   * from fieldConfigList in TableConfig via the properties bag.
+   * @param tableConfig table config
+   */
+  private void extractForwardIndexDisabledColumnsFromTableConfig(TableConfig tableConfig) {
+    List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
+    if (fieldConfigList != null) {
+      for (FieldConfig fieldConfig : fieldConfigList) {
+        Map<String, String> fieldConfigProperties = fieldConfig.getProperties();
+        if (fieldConfigProperties != null) {
+          boolean forwardIndexDisabled = Boolean.parseBoolean(fieldConfigProperties
+              .getOrDefault(FieldConfig.FORWARD_INDEX_DISABLED, FieldConfig.DEFAULT_FORWARD_INDEX_DISABLED));
+          if (forwardIndexDisabled) {
+            _forwardIndexDisabledColumns.add(fieldConfig.getName());
+          }
+        }
+      }
+    }
+  }
+
   public Map<String, String> getCustomProperties() {
     return _customProperties;
   }
@@ -426,6 +452,10 @@ public class SegmentGeneratorConfig implements Serializable {
     return _jsonIndexConfigs;
   }
 
+  public List<String> getForwardIndexDisabledColumns() {
+    return _forwardIndexDisabledColumns;
+  }
+
   public Map<String, H3IndexConfig> getH3IndexConfigs() {
     return _h3IndexConfigs;
   }
@@ -461,6 +491,20 @@ public class SegmentGeneratorConfig implements Serializable {
     }
   }
 
+  @VisibleForTesting
+  public void setRangeIndexCreationColumns(List<String> rangeIndexCreationColumns) {
+    if (rangeIndexCreationColumns != null) {
+      _rangeIndexCreationColumns.addAll(rangeIndexCreationColumns);
+    }
+  }
+
+  @VisibleForTesting
+  public void setForwardIndexDisabledColumns(List<String> forwardIndexDisabledColumns) {
+    if (forwardIndexDisabledColumns != null) {
+      _forwardIndexDisabledColumns.addAll(forwardIndexDisabledColumns);
+    }
+  }
+
   @VisibleForTesting
   public void setColumnProperties(Map<String, Map<String, String>> columnProperties) {
     _columnProperties = columnProperties;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
index 82bae1b46c..08215a84b9 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
@@ -47,6 +47,9 @@ public class FieldConfig extends BaseJsonConfig {
   // "native" for native, default is Lucene
   public static final String TEXT_FST_TYPE = "fstType";
   public static final String TEXT_NATIVE_FST_LITERAL = "native";
+  // Config to disable forward index
+  public static final String FORWARD_INDEX_DISABLED = "forwardIndexDisabled";
+  public static final String DEFAULT_FORWARD_INDEX_DISABLED = Boolean.FALSE.toString();
 
   private final String _name;
   private final EncodingType _encodingType;
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
index 0367203eb9..fc937bc1d0 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/DictionaryToRawIndexConverter.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.tools.segment.converter;
 
+import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -295,7 +296,9 @@ public class DictionaryToRawIndexConverter {
   private void convertOneColumn(IndexSegment segment, String column, File newSegment)
       throws IOException {
     DataSource dataSource = segment.getDataSource(column);
-    ForwardIndexReader reader = dataSource.getForwardIndex();
+    ForwardIndexReader forwardIndexReader = dataSource.getForwardIndex();
+    Preconditions.checkState(forwardIndexReader != null,
+        "Forward index disabled for column: %s, cannot convert column!", column);
     Dictionary dictionary = dataSource.getDictionary();
 
     if (dictionary == null) {
@@ -317,36 +320,36 @@ public class DictionaryToRawIndexConverter {
     try (ForwardIndexCreator rawIndexCreator = DefaultIndexCreatorProvider
         .getRawIndexCreatorForSVColumn(newSegment, compressionType, column, storedType, numDocs, lengthOfLongestEntry,
             false, BaseChunkSVForwardIndexWriter.DEFAULT_VERSION);
-        ForwardIndexReaderContext readerContext = reader.createContext()) {
+        ForwardIndexReaderContext readerContext = forwardIndexReader.createContext()) {
       switch (storedType) {
         case INT:
           for (int docId = 0; docId < numDocs; docId++) {
-            rawIndexCreator.putInt(dictionary.getIntValue(reader.getDictId(docId, readerContext)));
+            rawIndexCreator.putInt(dictionary.getIntValue(forwardIndexReader.getDictId(docId, readerContext)));
           }
           break;
         case LONG:
           for (int docId = 0; docId < numDocs; docId++) {
-            rawIndexCreator.putLong(dictionary.getLongValue(reader.getDictId(docId, readerContext)));
+            rawIndexCreator.putLong(dictionary.getLongValue(forwardIndexReader.getDictId(docId, readerContext)));
           }
           break;
         case FLOAT:
           for (int docId = 0; docId < numDocs; docId++) {
-            rawIndexCreator.putFloat(dictionary.getFloatValue(reader.getDictId(docId, readerContext)));
+            rawIndexCreator.putFloat(dictionary.getFloatValue(forwardIndexReader.getDictId(docId, readerContext)));
           }
           break;
         case DOUBLE:
           for (int docId = 0; docId < numDocs; docId++) {
-            rawIndexCreator.putDouble(dictionary.getDoubleValue(reader.getDictId(docId, readerContext)));
+            rawIndexCreator.putDouble(dictionary.getDoubleValue(forwardIndexReader.getDictId(docId, readerContext)));
           }
           break;
         case STRING:
           for (int docId = 0; docId < numDocs; docId++) {
-            rawIndexCreator.putString(dictionary.getStringValue(reader.getDictId(docId, readerContext)));
+            rawIndexCreator.putString(dictionary.getStringValue(forwardIndexReader.getDictId(docId, readerContext)));
           }
           break;
         case BYTES:
           for (int docId = 0; docId < numDocs; docId++) {
-            rawIndexCreator.putBytes(dictionary.getBytesValue(reader.getDictId(docId, readerContext)));
+            rawIndexCreator.putBytes(dictionary.getBytesValue(forwardIndexReader.getDictId(docId, readerContext)));
           }
           break;
         default:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org