You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2020/12/25 01:58:50 UTC

[incubator-pinot] branch master updated: Add FST index which works on top of REGEXP_LIKE operator. (#6120)

This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bb6c14  Add FST index which works on top of REGEXP_LIKE operator. (#6120)
1bb6c14 is described below

commit 1bb6c149ee4412d4e5da3a3b23565d76378df4d4
Author: pradeepgv42 <66...@users.noreply.github.com>
AuthorDate: Thu Dec 24 17:58:33 2020 -0800

    Add FST index which works on top of REGEXP_LIKE operator. (#6120)
    
    This index is set to enable only on dictionary encoded string column.
    
    Co-authored-by: Pradeep Gopanapalli Venkata <pr...@Pradeeps-MacBook-Pro.local>
---
 .../org/apache/pinot/core/common/DataSource.java   |   5 +
 .../realtime/HLRealtimeSegmentDataManager.java     |   4 +-
 .../realtime/LLRealtimeSegmentDataManager.java     |   9 +-
 .../generator/SegmentGeneratorConfig.java          |  23 +
 .../indexsegment/mutable/MutableSegmentImpl.java   |  31 +-
 .../operator/filter/BitmapBasedFilterOperator.java |   6 +-
 .../core/operator/filter/FilterOperatorUtils.java  |   6 +
 .../FSTBasedRegexpPredicateEvaluatorFactory.java   | 156 +++++++
 .../org/apache/pinot/core/plan/FilterPlanNode.java |  31 ++
 .../converter/RealtimeSegmentConverter.java        |   7 +-
 .../RealtimeSegmentSegmentCreationDataSource.java  |   3 +-
 .../core/realtime/impl/RealtimeSegmentConfig.java  |  24 +-
 .../RealtimeLuceneTextIndexReader.java             |   6 +
 .../pinot/core/segment/creator/TextIndexType.java  |   3 +-
 .../creator/impl/SegmentColumnarIndexCreator.java  |  35 +-
 .../core/segment/creator/impl/V1Constants.java     |   2 +
 .../impl/inv/text/LuceneFSTIndexCreator.java       | 103 +++++
 .../segment/index/column/ColumnIndexContainer.java |   5 +
 .../index/column/PhysicalColumnIndexContainer.java |  22 +
 .../converter/SegmentV1V2ToV3FormatConverter.java  |  11 +
 .../segment/index/datasource/BaseDataSource.java   |  18 +-
 .../index/datasource/ImmutableDataSource.java      |   1 +
 .../index/datasource/MutableDataSource.java        |  17 +-
 .../segment/index/loader/IndexLoadingConfig.java   |  22 +
 .../segment/index/loader/SegmentPreProcessor.java  |   9 +
 .../defaultcolumn/BaseDefaultColumnHandler.java    |   3 +-
 .../invertedindex/LuceneFSTIndexHandler.java       | 157 +++++++
 .../segment/index/metadata/ColumnMetadata.java     |  20 +-
 .../index/readers/LuceneFSTIndexReader.java        |  82 ++++
 .../segment/index/readers/TextIndexReader.java     |   8 +
 .../index/readers/text/LuceneTextIndexReader.java  |   6 +
 .../pinot/core/segment/store/ColumnIndexType.java  |   1 +
 .../core/segment/store/FilePerIndexDirectory.java  |   7 +
 .../core/segment/store/SegmentDirectoryPaths.java  |   9 +
 .../segment/store/SingleFileIndexDirectory.java    |   3 +
 .../virtualcolumn/VirtualColumnIndexContainer.java |   5 +
 .../core/startree/v2/store/StarTreeDataSource.java |   4 +-
 .../apache/pinot/core/util/TableConfigUtils.java   |  16 +-
 .../org/apache/pinot/core/util/fst/FSTBuilder.java |  65 +++
 .../pinot/core/util/fst/PinotBufferIndexInput.java |  89 ++++
 .../apache/pinot/core/util/fst/RegexpMatcher.java  | 164 +++++++
 .../index/creator/LuceneFSTIndexCreatorTest.java   |  74 ++++
 .../core/segment/index/loader/LoaderTest.java      | 139 ++++++
 .../index/loader/SegmentPreProcessorTest.java      | 105 ++++-
 .../pinot/core/util/TableConfigUtilsTest.java      |  44 ++
 .../queries/FSTBasedRegexpLikeQueriesTest.java     | 477 +++++++++++++++++++++
 .../pinot/queries/TextSearchQueriesTest.java       |   3 +-
 .../java/org/apache/pinot/util/FSTBuilderTest.java | 107 +++++
 .../src/test/resources/data/newColumnsSchema4.json |  84 ++++
 .../apache/pinot/spi/config/table/FieldConfig.java |   2 +-
 50 files changed, 2182 insertions(+), 51 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/DataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/common/DataSource.java
index 75f0513..23234c3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/DataSource.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/DataSource.java
@@ -67,6 +67,11 @@ public interface DataSource {
   TextIndexReader getTextIndex();
 
   /**
+   * Returns the FST index for the column if exists, or {@code null} if not.
+   */
+  TextIndexReader getFSTIndex();
+
+  /**
    * Returns the bloom filter for the column if exists, or {@code null} if not.
    */
   @Nullable
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index ab608fd..015839a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -289,8 +289,8 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
           RealtimeSegmentConverter converter =
               new RealtimeSegmentConverter(_realtimeSegment, tempSegmentFolder.getAbsolutePath(), schema,
                   _tableNameWithType, tableConfig, realtimeSegmentZKMetadata.getSegmentName(), _sortedColumn,
-                  _invertedIndexColumns, Collections.emptyList(), _noDictionaryColumns, _varLengthDictionaryColumns,
-                  indexingConfig.isNullHandlingEnabled());
+                  _invertedIndexColumns, Collections.emptyList(), Collections.emptyList(), _noDictionaryColumns,
+                  _varLengthDictionaryColumns, indexingConfig.isNullHandlingEnabled());
 
           _segmentLogger.info("Trying to build segment");
           final long buildStartTime = System.nanoTime();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 62e6d5b..9656fd0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -257,6 +257,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   private final String _tableNameWithType;
   private final List<String> _invertedIndexColumns;
   private final List<String> _textIndexColumns;
+  private final List<String> _fstIndexColumns;
   private final List<String> _noDictionaryColumns;
   private final List<String> _varLengthDictionaryColumns;
   private final String _sortedColumn;
@@ -757,8 +758,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       RealtimeSegmentConverter converter =
           new RealtimeSegmentConverter(_realtimeSegment, tempSegmentFolder.getAbsolutePath(), _schema,
               _tableNameWithType, _tableConfig, _segmentZKMetadata.getSegmentName(), _sortedColumn,
-              _invertedIndexColumns, _textIndexColumns, _noDictionaryColumns, _varLengthDictionaryColumns,
-              _nullHandlingEnabled);
+              _invertedIndexColumns, _textIndexColumns, _fstIndexColumns, _noDictionaryColumns,
+              _varLengthDictionaryColumns, _nullHandlingEnabled);
       segmentLogger.info("Trying to build segment");
       try {
         converter.build(_segmentVersion, _serverMetrics);
@@ -1186,6 +1187,9 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     Set<String> textIndexColumns = indexLoadingConfig.getTextIndexColumns();
     _textIndexColumns = new ArrayList<>(textIndexColumns);
 
+    Set<String> fstIndexColumns = indexLoadingConfig.getFSTIndexColumns();
+    _fstIndexColumns = new ArrayList<>(fstIndexColumns);
+
     // Start new realtime segment
     String consumerDir = realtimeTableDataManager.getConsumerDir();
     RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
@@ -1195,6 +1199,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
             .setNoDictionaryColumns(indexLoadingConfig.getNoDictionaryColumns())
             .setVarLengthDictionaryColumns(indexLoadingConfig.getVarLengthDictionaryColumns())
             .setInvertedIndexColumns(invertedIndexColumns).setTextIndexColumns(textIndexColumns)
+            .setFSTIndexColumns(fstIndexColumns)
             .setRealtimeSegmentZKMetadata(segmentZKMetadata).setOffHeap(_isOffHeap).setMemoryManager(_memoryManager)
             .setStatsHistory(realtimeTableDataManager.getStatsHistory())
             .setAggregateMetrics(indexingConfig.isAggregateMetrics()).setNullHandlingEnabled(_nullHandlingEnabled)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
index 1d3324f..5b2cde8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
@@ -70,6 +70,7 @@ public class SegmentGeneratorConfig {
   private Map<String, ChunkCompressorFactory.CompressionType> _rawIndexCompressionType = new HashMap<>();
   private List<String> _invertedIndexCreationColumns = new ArrayList<>();
   private List<String> _textIndexCreationColumns = new ArrayList<>();
+  private List<String> _fstIndexCreationColumns = new ArrayList<>();
   private List<String> _columnSortOrder = new ArrayList<>();
   private List<String> _varLengthDictionaryColumns = new ArrayList<>();
   private String _inputFilePath = null;
@@ -178,6 +179,7 @@ public class SegmentGeneratorConfig {
       }
 
       extractTextIndexColumnsFromTableConfig(tableConfig);
+      extractFSTIndexColumnsFromTableConfig(tableConfig);
 
       _nullHandlingEnabled = indexingConfig.isNullHandlingEnabled();
     }
@@ -224,6 +226,17 @@ public class SegmentGeneratorConfig {
     }
   }
 
+  private void extractFSTIndexColumnsFromTableConfig(TableConfig tableConfig) {
+    List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
+    if (fieldConfigList != null) {
+      for (FieldConfig fieldConfig : fieldConfigList) {
+        if (fieldConfig.getIndexType() == FieldConfig.IndexType.FST) {
+          _fstIndexCreationColumns.add(fieldConfig.getName());
+        }
+      }
+    }
+  }
+
   public Map<String, String> getCustomProperties() {
     return _customProperties;
   }
@@ -273,6 +286,10 @@ public class SegmentGeneratorConfig {
     return _textIndexCreationColumns;
   }
 
+  public List<String> getFSTIndexCreationColumns() {
+    return _fstIndexCreationColumns;
+  }
+
   public List<String> getColumnSortOrder() {
     return _columnSortOrder;
   }
@@ -305,6 +322,12 @@ public class SegmentGeneratorConfig {
     _columnProperties = columnProperties;
   }
 
+  public void setFSTIndexCreationColumns(List<String> fstIndexCreationColumns) {
+    if (fstIndexCreationColumns != null) {
+      _fstIndexCreationColumns.addAll(fstIndexCreationColumns);
+    }
+  }
+
   public void setColumnSortOrder(List<String> sortOrder) {
     Preconditions.checkNotNull(sortOrder);
     _columnSortOrder.addAll(sortOrder);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index d0f391f..fb839c2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -63,6 +63,7 @@ import org.apache.pinot.core.segment.index.readers.InvertedIndexReader;
 import org.apache.pinot.core.segment.index.readers.MutableForwardIndex;
 import org.apache.pinot.core.segment.index.readers.ValidDocIndexReader;
 import org.apache.pinot.core.segment.index.readers.ValidDocIndexReaderImpl;
+import org.apache.pinot.core.segment.index.readers.TextIndexReader;
 import org.apache.pinot.core.segment.virtualcolumn.VirtualColumnContext;
 import org.apache.pinot.core.segment.virtualcolumn.VirtualColumnProvider;
 import org.apache.pinot.core.segment.virtualcolumn.VirtualColumnProviderFactory;
@@ -220,6 +221,7 @@ public class MutableSegmentImpl implements MutableSegment {
     Set<String> noDictionaryColumns = config.getNoDictionaryColumns();
     Set<String> invertedIndexColumns = config.getInvertedIndexColumns();
     Set<String> textIndexColumns = config.getTextIndexColumns();
+    Set<String> fstIndexColumns = config.getFSTIndexColumns();
 
     int avgNumMultiValues = config.getAvgNumMultiValues();
 
@@ -327,8 +329,10 @@ public class MutableSegmentImpl implements MutableSegment {
 
       // TODO: Support range index and bloom filter for mutable segment
       _indexContainerMap.put(column,
-          new IndexContainer(fieldSpec, partitionFunction, partitions, new NumValuesInfo(), forwardIndex, dictionary,
-              invertedIndexReader, null, textIndex, null, nullValueVector));
+          new IndexContainer(fieldSpec, fstIndexColumns.contains(column),
+              partitionFunction, partitions, new NumValuesInfo(), forwardIndex, dictionary,
+              invertedIndexReader, null, textIndex, null,
+              null, nullValueVector));
     }
 
     if (_realtimeLuceneReaders != null) {
@@ -1046,11 +1050,16 @@ public class MutableSegmentImpl implements MutableSegment {
     int _dictId = Integer.MIN_VALUE;
     int[] _dictIds;
 
-    IndexContainer(FieldSpec fieldSpec, @Nullable PartitionFunction partitionFunction,
-        @Nullable Set<Integer> partitions, NumValuesInfo numValuesInfo, MutableForwardIndex forwardIndex,
-        @Nullable MutableDictionary dictionary, @Nullable RealtimeInvertedIndexReader invertedIndex,
-        @Nullable InvertedIndexReader rangeIndex, @Nullable RealtimeLuceneTextIndexReader textIndex,
-        @Nullable BloomFilterReader bloomFilter, @Nullable MutableNullValueVector nullValueVector) {
+    boolean _fstIndexEnabled;
+
+    IndexContainer(
+        FieldSpec fieldSpec, boolean fstIndexEnabled,
+        @Nullable PartitionFunction partitionFunction, @Nullable Set<Integer> partitions,
+        NumValuesInfo numValuesInfo, MutableForwardIndex forwardIndex, @Nullable MutableDictionary dictionary,
+        @Nullable RealtimeInvertedIndexReader invertedIndex, @Nullable InvertedIndexReader rangeIndex,
+        @Nullable RealtimeLuceneTextIndexReader textIndex, @Nullable TextIndexReader fstIndex,
+        @Nullable BloomFilterReader bloomFilter,
+        @Nullable MutableNullValueVector nullValueVector) {
       _fieldSpec = fieldSpec;
       _partitionFunction = partitionFunction;
       _partitions = partitions;
@@ -1062,12 +1071,16 @@ public class MutableSegmentImpl implements MutableSegment {
       _textIndex = textIndex;
       _bloomFilter = bloomFilter;
       _nullValueVector = nullValueVector;
+      _fstIndexEnabled = fstIndexEnabled;
     }
 
     DataSource toDataSource() {
+      // FST Index is currently not built for consuming segment, only when the segment is rolled out FST index is
+      // generated.
       return new MutableDataSource(_fieldSpec, _numDocsIndexed, _numValuesInfo._numValues,
-          _numValuesInfo._maxNumValuesPerMVEntry, _partitionFunction, _partitions, _minValue, _maxValue, _forwardIndex,
-          _dictionary, _invertedIndex, _rangeIndex, _textIndex, _bloomFilter, _nullValueVector);
+          _numValuesInfo._maxNumValuesPerMVEntry, _fstIndexEnabled, _partitionFunction, _partitions, _minValue,
+          _maxValue, _forwardIndex, _dictionary, _invertedIndex, _rangeIndex, _textIndex, null, _bloomFilter,
+          _nullValueVector);
     }
 
     @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapBasedFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapBasedFilterOperator.java
index d9c25e1..6831966 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapBasedFilterOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapBasedFilterOperator.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.operator.filter;
 
 import com.google.common.base.Preconditions;
 import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.operator.blocks.EmptyFilterBlock;
 import org.apache.pinot.core.operator.blocks.FilterBlock;
 import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet;
 import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
@@ -66,8 +67,9 @@ public class BitmapBasedFilterOperator extends BaseFilterOperator {
 
     int[] dictIds = _exclusive ? _predicateEvaluator.getNonMatchingDictIds() : _predicateEvaluator.getMatchingDictIds();
     int numDictIds = dictIds.length;
-    // NOTE: PredicateEvaluator without matching/non-matching dictionary ids should not reach here.
-    Preconditions.checkState(numDictIds > 0);
+    if (numDictIds == 0) {
+      return EmptyFilterBlock.getInstance();
+    }
     if (numDictIds == 1) {
       ImmutableRoaringBitmap docIds = (ImmutableRoaringBitmap) _invertedIndexReader.getDocIds(dictIds[0]);
       if (_exclusive) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java
index a0b7cae..d99bac7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java
@@ -63,6 +63,12 @@ public class FilterOperatorUtils {
       }
       return new ScanBasedFilterOperator(predicateEvaluator, dataSource, numDocs);
     } else if (predicateType == Predicate.Type.REGEXP_LIKE) {
+      if (dataSource.getFSTIndex() != null && dataSource.getDataSourceMetadata().isSorted()) {
+        return new SortedIndexBasedFilterOperator(predicateEvaluator, dataSource, numDocs);
+      }
+      if (dataSource.getFSTIndex() != null && dataSource.getInvertedIndex() != null) {
+        return new BitmapBasedFilterOperator(predicateEvaluator, dataSource, numDocs);
+      }
       return new ScanBasedFilterOperator(predicateEvaluator, dataSource, numDocs);
     } else {
       if (dataSource.getDataSourceMetadata().isSorted() && dataSource.getDictionary() != null) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/FSTBasedRegexpPredicateEvaluatorFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/FSTBasedRegexpPredicateEvaluatorFactory.java
new file mode 100644
index 0000000..63f881a
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/FSTBasedRegexpPredicateEvaluatorFactory.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.filter.predicate;
+
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+import org.apache.pinot.core.query.request.context.predicate.Predicate;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.apache.pinot.core.segment.index.readers.TextIndexReader;
+import org.apache.pinot.core.util.fst.RegexpMatcher;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+
+import static org.apache.pinot.core.query.request.context.predicate.Predicate.Type.REGEXP_LIKE;
+
+
+/**
+ * Factory for REGEXP_LIKE predicate evaluators when FST index is enabled.
+ */
+public class FSTBasedRegexpPredicateEvaluatorFactory {
+    public FSTBasedRegexpPredicateEvaluatorFactory() {}
+
+    /**
+     * Creates a predicate evaluator which matches the regexp query pattern using
+     * FST Index available. FST Index is not yet present for consuming segments,
+     * so use newAutomatonBasedEvaluator for consuming segments.
+     *
+     * @param fstIndexReader FST Index reader
+     * @param dictionary Dictionary for the column
+     * @param regexpQuery input query to match
+     * @return Predicate evaluator
+     */
+    public static BaseDictionaryBasedPredicateEvaluator newFSTBasedEvaluator(
+            TextIndexReader fstIndexReader, Dictionary dictionary, String regexpQuery) {
+        return new FSTBasedRegexpPredicateEvaluatorFactory.FSTBasedRegexpPredicateEvaluator(
+                fstIndexReader, dictionary, regexpQuery);
+    }
+
+    /**
+     * Creates a predicate evaluator which uses regex matching logic which is similar to
+     * FSTBasedRegexpPredicateEvaluator. This predicate evaluator is used for consuming
+     * segments and is there to make sure results are consistent between consuming and
+     * rolled out segments when FST index is enabled.
+     *
+     * @param dictionary Dictionary for the column
+     * @param regexpQuery input query to match
+     * @return Predicate evaluator
+     */
+    public static BaseDictionaryBasedPredicateEvaluator newAutomatonBasedEvaluator(
+            Dictionary dictionary, String regexpQuery) {
+        return new FSTBasedRegexpPredicateEvaluatorFactory.AutomatonBasedRegexpPredicateEvaluator(
+                regexpQuery, dictionary);
+    }
+
+    /**
+     * This predicate evaluator is created to be used for consuming segments. This evaluator
+     * creates automaton following very similar logic to that of FSTBasedRegexpPredicateEvaluator,
+     * so the results stay consistent across consuming and rolled out segments.
+     */
+    private static class AutomatonBasedRegexpPredicateEvaluator extends BaseDictionaryBasedPredicateEvaluator {
+        private final RegexpMatcher _regexpMatcher;
+        private final Dictionary _dictionary;
+        int[] _matchingDictIds;
+
+        public AutomatonBasedRegexpPredicateEvaluator(String searchQuery, Dictionary dictionary) {
+            _regexpMatcher = new RegexpMatcher(searchQuery, null);
+            _dictionary = dictionary;
+        }
+
+        @Override
+        public Predicate.Type getPredicateType() {
+            return REGEXP_LIKE;
+        }
+
+        @Override
+        public boolean applySV(int dictId) {
+            return _regexpMatcher.match(_dictionary.getStringValue(dictId));
+        }
+
+        @Override
+        public int[] getMatchingDictIds() {
+            if (_matchingDictIds == null) {
+                IntList matchingDictIds = new IntArrayList();
+                int dictionarySize = _dictionary.length();
+                for (int dictId = 0; dictId < dictionarySize; dictId++) {
+                    if (applySV(dictId)) {
+                        matchingDictIds.add(dictId);
+                    }
+                }
+                _matchingDictIds = matchingDictIds.toIntArray();
+            }
+            return _matchingDictIds;
+        }
+    }
+
+    /**
+     * Matches regexp query using FSTIndexReader.
+     */
+    private static class FSTBasedRegexpPredicateEvaluator extends BaseDictionaryBasedPredicateEvaluator {
+        private final TextIndexReader _fstIndexReader;
+        private final String _searchQuery;
+        private final ImmutableRoaringBitmap _dictIds;
+        private final Dictionary _dictionary;
+
+        public FSTBasedRegexpPredicateEvaluator(
+                TextIndexReader fstIndexReader,
+                Dictionary dictionary,
+                String searchQuery) {
+            _dictionary = dictionary;
+            _fstIndexReader = fstIndexReader;
+            _searchQuery = searchQuery;
+            _dictIds = _fstIndexReader.getDictIds(_searchQuery);
+        }
+
+        @Override
+        public boolean isAlwaysFalse() {
+            return _dictIds.isEmpty();
+        }
+
+        @Override
+        public boolean isAlwaysTrue() {
+            return _dictIds.getCardinality() == _dictionary.length();
+        }
+
+        @Override
+        public Predicate.Type getPredicateType() {
+            return REGEXP_LIKE;
+        }
+
+        @Override
+        public boolean applySV(int dictId) {
+            return _dictIds.contains(dictId);
+        }
+
+        @Override
+        public int[] getMatchingDictIds() {
+            return _dictIds.toArray();
+        }
+    }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
index 7e076e0..c677010 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
@@ -23,7 +23,9 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
+
 import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.common.DataSourceMetadata;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.operator.filter.BaseFilterOperator;
 import org.apache.pinot.core.operator.filter.BitmapBasedFilterOperator;
@@ -32,13 +34,16 @@ import org.apache.pinot.core.operator.filter.ExpressionFilterOperator;
 import org.apache.pinot.core.operator.filter.FilterOperatorUtils;
 import org.apache.pinot.core.operator.filter.MatchAllFilterOperator;
 import org.apache.pinot.core.operator.filter.TextMatchFilterOperator;
+import org.apache.pinot.core.operator.filter.predicate.FSTBasedRegexpPredicateEvaluatorFactory;
 import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
 import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider;
 import org.apache.pinot.core.query.request.context.ExpressionContext;
 import org.apache.pinot.core.query.request.context.FilterContext;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.predicate.Predicate;
+import org.apache.pinot.core.query.request.context.predicate.RegexpLikePredicate;
 import org.apache.pinot.core.query.request.context.predicate.TextMatchPredicate;
+import org.apache.pinot.core.segment.index.datasource.MutableDataSource;
 import org.apache.pinot.core.segment.index.readers.NullValueVectorReader;
 import org.apache.pinot.core.segment.index.readers.ValidDocIndexReader;
 import org.apache.pinot.core.util.QueryOptions;
@@ -129,6 +134,32 @@ public class FilterPlanNode implements PlanNode {
             case TEXT_MATCH:
               return new TextMatchFilterOperator(dataSource.getTextIndex(), ((TextMatchPredicate) predicate).getValue(),
                   _numDocs);
+            case REGEXP_LIKE:
+              PredicateEvaluator evaluator = null;
+
+              // FST Index is available only for rolled out segments. So, we use different
+              // evaluator for rolled out and consuming segments.
+              //
+              // Rolled out segments (immutable): FST Index reader is available use FSTBasedEvaluator
+              // else use regular flow of getting predicate evaluator.
+              //
+              // Consuming segments: when FSTIndex is enabled use AutomatonBasedEvaluator so that regexp
+              // matching logic is similar to that of FSTBasedEvaluator else use regular flow of getting
+              // predicate evaluator.
+              if (dataSource.getFSTIndex() != null) {
+                evaluator = FSTBasedRegexpPredicateEvaluatorFactory
+                    .newFSTBasedEvaluator(dataSource.getFSTIndex(), dataSource.getDictionary(),
+                        ((RegexpLikePredicate) predicate).getValue());
+              } else if (dataSource instanceof MutableDataSource && ((MutableDataSource) dataSource)
+                  .hasFSTIndexEnabled()) {
+                evaluator = FSTBasedRegexpPredicateEvaluatorFactory
+                    .newAutomatonBasedEvaluator(dataSource.getDictionary(),
+                        ((RegexpLikePredicate) predicate).getValue());
+              } else {
+                evaluator = PredicateEvaluatorProvider.getPredicateEvaluator(predicate, dataSource.getDictionary(),
+                    dataSource.getDataSourceMetadata().getDataType());
+              }
+              return FilterOperatorUtils.getLeafFilterOperator(evaluator, dataSource, _numDocs);
             case IS_NULL:
               NullValueVectorReader nullValueVector = dataSource.getNullValueVector();
               if (nullValueVector != null) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java
index 0bdf2a9..aadb81e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java
@@ -52,14 +52,15 @@ public class RealtimeSegmentConverter {
   private final String _sortedColumn;
   private final List<String> _invertedIndexColumns;
   private final List<String> _textIndexColumns;
+  private final List<String> _fstIndexColumns;
   private final List<String> _noDictionaryColumns;
   private final List<String> _varLengthDictionaryColumns;
   private final boolean _nullHandlingEnabled;
 
   public RealtimeSegmentConverter(MutableSegmentImpl realtimeSegment, String outputPath, Schema schema,
       String tableName, TableConfig tableConfig, String segmentName, String sortedColumn,
-      List<String> invertedIndexColumns, List<String> textIndexColumns, List<String> noDictionaryColumns,
-      List<String> varLengthDictionaryColumns, boolean nullHandlingEnabled) {
+      List<String> invertedIndexColumns, List<String> textIndexColumns, List<String> fstIndexColumns,
+      List<String> noDictionaryColumns, List<String> varLengthDictionaryColumns, boolean nullHandlingEnabled) {
     _realtimeSegmentImpl = realtimeSegment;
     _outputPath = outputPath;
     _invertedIndexColumns = new ArrayList<>(invertedIndexColumns);
@@ -75,6 +76,7 @@ public class RealtimeSegmentConverter {
     _varLengthDictionaryColumns = varLengthDictionaryColumns;
     _nullHandlingEnabled = nullHandlingEnabled;
     _textIndexColumns = textIndexColumns;
+    _fstIndexColumns = fstIndexColumns;
   }
 
   public void build(@Nullable SegmentVersion segmentVersion, ServerMetrics serverMetrics)
@@ -116,6 +118,7 @@ public class RealtimeSegmentConverter {
     genConfig.setOutDir(_outputPath);
     genConfig.setSegmentName(_segmentName);
     genConfig.setTextIndexCreationColumns(_textIndexColumns);
+    genConfig.setFSTIndexCreationColumns(_fstIndexColumns);
     SegmentPartitionConfig segmentPartitionConfig = _realtimeSegmentImpl.getSegmentPartitionConfig();
     genConfig.setSegmentPartitionConfig(segmentPartitionConfig);
     genConfig.setNullHandlingEnabled(_nullHandlingEnabled);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeSegmentSegmentCreationDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeSegmentSegmentCreationDataSource.java
index 069d2a7..7a6337d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeSegmentSegmentCreationDataSource.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeSegmentSegmentCreationDataSource.java
@@ -48,7 +48,8 @@ public class RealtimeSegmentSegmentCreationDataSource implements SegmentCreation
       throw new RuntimeException("Incompatible schemas used for conversion and extraction");
     }
 
-    return new RealtimeSegmentStatsContainer(_realtimeSegment, _realtimeSegmentRecordReader);
+    return new RealtimeSegmentStatsContainer(
+            _realtimeSegment, _realtimeSegmentRecordReader);
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java
index b24fffd..7997931 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java
@@ -40,6 +40,7 @@ public class RealtimeSegmentConfig {
   private final Set<String> _varLengthDictionaryColumns;
   private final Set<String> _invertedIndexColumns;
   private final Set<String> _textIndexColumns;
+  private final Set<String> _fstIndexColumns;
   private final RealtimeSegmentZKMetadata _realtimeSegmentZKMetadata;
   private final boolean _offHeap;
   private final PinotDataBufferMemoryManager _memoryManager;
@@ -54,9 +55,9 @@ public class RealtimeSegmentConfig {
   private final String _consumerDir;
 
   // TODO: Clean up this constructor. Most of these things can be extracted from tableConfig.
-  private RealtimeSegmentConfig(String tableNameWithType, String segmentName, String streamName, Schema schema,
-      String timeColumnName, int capacity, int avgNumMultiValues, Set<String> noDictionaryColumns,
-      Set<String> varLengthDictionaryColumns, Set<String> invertedIndexColumns, Set<String> textIndexColumns,
+  private RealtimeSegmentConfig(String tableNameWithType, String segmentName, String streamName, Schema schema, String timeColumnName,
+      int capacity, int avgNumMultiValues, Set<String> noDictionaryColumns, Set<String> varLengthDictionaryColumns,
+      Set<String> invertedIndexColumns, Set<String> textIndexColumns, Set<String> fstIndexColumns,
       RealtimeSegmentZKMetadata realtimeSegmentZKMetadata, boolean offHeap, PinotDataBufferMemoryManager memoryManager,
       RealtimeSegmentStatsHistory statsHistory, String partitionColumn, PartitionFunction partitionFunction,
       int partitionId, boolean aggregateMetrics, boolean nullHandlingEnabled, String consumerDir,
@@ -72,6 +73,7 @@ public class RealtimeSegmentConfig {
     _varLengthDictionaryColumns = varLengthDictionaryColumns;
     _invertedIndexColumns = invertedIndexColumns;
     _textIndexColumns = textIndexColumns;
+    _fstIndexColumns = fstIndexColumns;
     _realtimeSegmentZKMetadata = realtimeSegmentZKMetadata;
     _offHeap = offHeap;
     _memoryManager = memoryManager;
@@ -135,6 +137,10 @@ public class RealtimeSegmentConfig {
     return _textIndexColumns;
   }
 
+  public Set<String> getFSTIndexColumns() {
+    return _fstIndexColumns;
+  }
+
   public RealtimeSegmentZKMetadata getRealtimeSegmentZKMetadata() {
     return _realtimeSegmentZKMetadata;
   }
@@ -195,6 +201,7 @@ public class RealtimeSegmentConfig {
     private Set<String> _varLengthDictionaryColumns;
     private Set<String> _invertedIndexColumns;
     private Set<String> _textIndexColumns = new HashSet<>();
+    private Set<String> _fstIndexColumns = new HashSet<>();
     private RealtimeSegmentZKMetadata _realtimeSegmentZKMetadata;
     private boolean _offHeap;
     private PinotDataBufferMemoryManager _memoryManager;
@@ -274,6 +281,11 @@ public class RealtimeSegmentConfig {
       return this;
     }
 
+    public Builder setFSTIndexColumns(Set<String> fstIndexColumns) {
+      _fstIndexColumns = fstIndexColumns;
+      return this;
+    }
+
     public Builder setRealtimeSegmentZKMetadata(RealtimeSegmentZKMetadata realtimeSegmentZKMetadata) {
       _realtimeSegmentZKMetadata = realtimeSegmentZKMetadata;
       return this;
@@ -337,9 +349,9 @@ public class RealtimeSegmentConfig {
     public RealtimeSegmentConfig build() {
       return new RealtimeSegmentConfig(_tableNameWithType, _segmentName, _streamName, _schema, _timeColumnName,
           _capacity, _avgNumMultiValues, _noDictionaryColumns, _varLengthDictionaryColumns, _invertedIndexColumns,
-          _textIndexColumns, _realtimeSegmentZKMetadata, _offHeap, _memoryManager, _statsHistory, _partitionColumn,
-          _partitionFunction, _partitionId, _aggregateMetrics, _nullHandlingEnabled, _consumerDir, _upsertMode,
-          _partitionUpsertMetadataManager);
+          _textIndexColumns, _fstIndexColumns, _realtimeSegmentZKMetadata, _offHeap, _memoryManager,
+          _statsHistory, _partitionColumn, _partitionFunction, _partitionId, _aggregateMetrics,
+          _nullHandlingEnabled, _consumerDir, _upsertMode, _partitionUpsertMetadataManager);
     }
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/invertedindex/RealtimeLuceneTextIndexReader.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/invertedindex/RealtimeLuceneTextIndexReader.java
index 5023858..f3d39e4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/invertedindex/RealtimeLuceneTextIndexReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/invertedindex/RealtimeLuceneTextIndexReader.java
@@ -30,6 +30,7 @@ import org.apache.lucene.search.SearcherManager;
 import org.apache.pinot.core.segment.creator.impl.text.LuceneTextIndexCreator;
 import org.apache.pinot.core.segment.index.readers.TextIndexReader;
 import org.roaringbitmap.IntIterator;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 import org.slf4j.LoggerFactory;
 
@@ -90,6 +91,11 @@ public class RealtimeLuceneTextIndexReader implements TextIndexReader {
   }
 
   @Override
+  public ImmutableRoaringBitmap getDictIds(String searchQuery) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public MutableRoaringBitmap getDocIds(String searchQuery) {
     MutableRoaringBitmap docIDs = new MutableRoaringBitmap();
     Collector docIDCollector = new RealtimeLuceneDocIdCollector(docIDs);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/TextIndexType.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/TextIndexType.java
index 7930277..bbe5206 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/TextIndexType.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/TextIndexType.java
@@ -20,5 +20,6 @@ package org.apache.pinot.core.segment.creator;
 
 public enum TextIndexType {
   NONE,
-  LUCENE
+  LUCENE,
+  LUCENE_FST
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
index 6aca94e..e388a37 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -51,6 +51,7 @@ import org.apache.pinot.core.segment.creator.impl.fwd.SingleValueUnsortedForward
 import org.apache.pinot.core.segment.creator.impl.fwd.SingleValueVarByteRawIndexCreator;
 import org.apache.pinot.core.segment.creator.impl.inv.OffHeapBitmapInvertedIndexCreator;
 import org.apache.pinot.core.segment.creator.impl.inv.OnHeapBitmapInvertedIndexCreator;
+import org.apache.pinot.core.segment.creator.impl.inv.text.LuceneFSTIndexCreator;
 import org.apache.pinot.core.segment.creator.impl.nullvalue.NullValueVectorCreator;
 import org.apache.pinot.core.segment.creator.impl.text.LuceneTextIndexCreator;
 import org.apache.pinot.spi.config.table.FieldConfig;
@@ -88,6 +89,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
   private Map<String, ForwardIndexCreator> _forwardIndexCreatorMap = new HashMap<>();
   private Map<String, DictionaryBasedInvertedIndexCreator> _invertedIndexCreatorMap = new HashMap<>();
   private Map<String, TextIndexCreator> _textIndexCreatorMap = new HashMap<>();
+  private Map<String, TextIndexCreator> _fstIndexCreatorMap = new HashMap<>();
   private Map<String, NullValueVectorCreator> _nullValueVectorCreatorMap = new HashMap<>();
   private String segmentName;
   private Schema schema;
@@ -98,6 +100,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
   private Map<String, Map<String, String>> _columnProperties;
 
   private final Set<String> _textIndexColumns = new HashSet<>();
+  private final Set<String> _fstIndexColumns = new HashSet<>();
 
   @Override
   public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreationInfo segmentIndexCreationInfo,
@@ -131,6 +134,12 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
       _textIndexColumns.add(columnName);
     }
 
+    for (String columnName : config.getFSTIndexCreationColumns()) {
+      Preconditions.checkState(schema.hasColumn(columnName),
+          "Cannot create fst index for column: %s because it is not in schema", columnName);
+      _fstIndexColumns.add(columnName);
+    }
+
     // Initialize creators for dictionary, forward index and inverted index
     for (FieldSpec fieldSpec : fieldSpecs) {
       // Ignore virtual columns
@@ -141,8 +150,9 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
       String columnName = fieldSpec.getName();
       ColumnIndexCreationInfo indexCreationInfo = indexCreationInfoMap.get(columnName);
       Preconditions.checkNotNull(indexCreationInfo, "Missing index creation info for column: %s", columnName);
+      boolean dictEnabledColumn = createDictionaryForColumn(indexCreationInfo, segmentCreationSpec, fieldSpec);
 
-      if (createDictionaryForColumn(indexCreationInfo, segmentCreationSpec, fieldSpec)) {
+      if (dictEnabledColumn) {
         // Create dictionary-encoded index
 
         // Initialize dictionary creator
@@ -218,6 +228,17 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
             .put(columnName, new LuceneTextIndexCreator(columnName, _indexDir, true /* commitOnClose */));
       }
 
+      if (_fstIndexColumns.contains(columnName)) {
+        Preconditions.checkState(fieldSpec.isSingleValueField(),
+            "FST index is currently only supported on single-value columns");
+        Preconditions.checkState(fieldSpec.getDataType() == DataType.STRING,
+            "FST index is only supported on STRING type columns");
+        Preconditions.checkState(dictEnabledColumn,
+            "FST index is currently only supported on dictionary columns");
+        _fstIndexCreatorMap.put(columnName, new LuceneFSTIndexCreator(_indexDir, columnName,
+            (String[]) indexCreationInfo.getSortedUniqueElementsArray()));
+      }
+
       _nullHandlingEnabled = config.isNullHandlingEnabled();
       if (_nullHandlingEnabled) {
         // Initialize Null value vector map
@@ -334,7 +355,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
           int dictId = dictionaryCreator.indexOfSV(columnValueToIndex);
           // store the docID -> dictID mapping in forward index
           forwardIndexCreator.putDictId(dictId);
-          DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap.get(columnName);
+          DictionaryBasedInvertedIndexCreator invertedIndexCreator =
+                  _invertedIndexCreatorMap.get(columnName);
           if (invertedIndexCreator != null) {
             // if inverted index enabled during segment creation,
             // then store dictID -> docID mapping in inverted index
@@ -422,6 +444,9 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
     for (TextIndexCreator textIndexCreator : _textIndexCreatorMap.values()) {
       textIndexCreator.seal();
     }
+    for (TextIndexCreator fstIndexCreator : _fstIndexCreatorMap.values()) {
+      fstIndexCreator.seal();
+    }
     for (NullValueVectorCreator nullValueVectorCreator : _nullValueVectorCreatorMap.values()) {
       nullValueVectorCreator.seal();
     }
@@ -510,13 +535,14 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
       boolean hasInvertedIndex = true;
 
       boolean hasTextIndex = _textIndexColumns.contains(column);
+      boolean hasFSTIndex = _fstIndexColumns.contains(column);
       // for new generated segment we write as NONE if text index does not exist
       // for reading existing segments that don't have this property, non-existence
       // of this property will be treated as NONE. See the builder in ColumnMetadata
       TextIndexType textIndexType = hasTextIndex ? TextIndexType.LUCENE : TextIndexType.NONE;
 
       addColumnMetadataInfo(properties, column, columnIndexCreationInfo, totalDocs, schema.getFieldSpecFor(column),
-          _dictionaryCreatorMap.containsKey(column), dictionaryElementSize, hasInvertedIndex, textIndexType);
+          _dictionaryCreatorMap.containsKey(column), dictionaryElementSize, hasInvertedIndex, hasFSTIndex, textIndexType);
     }
 
     properties.save();
@@ -524,7 +550,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
 
   public static void addColumnMetadataInfo(PropertiesConfiguration properties, String column,
       ColumnIndexCreationInfo columnIndexCreationInfo, int totalDocs, FieldSpec fieldSpec, boolean hasDictionary,
-      int dictionaryElementSize, boolean hasInvertedIndex, TextIndexType textIndexType) {
+      int dictionaryElementSize, boolean hasInvertedIndex, boolean hasFSTIndex, TextIndexType textIndexType) {
     int cardinality = columnIndexCreationInfo.getDistinctValueCount();
     properties.setProperty(getKeyFor(column, CARDINALITY), String.valueOf(cardinality));
     properties.setProperty(getKeyFor(column, TOTAL_DOCS), String.valueOf(totalDocs));
@@ -539,6 +565,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
     properties.setProperty(getKeyFor(column, HAS_DICTIONARY), String.valueOf(hasDictionary));
     properties.setProperty(getKeyFor(column, TEXT_INDEX_TYPE), textIndexType.name());
     properties.setProperty(getKeyFor(column, HAS_INVERTED_INDEX), String.valueOf(hasInvertedIndex));
+    properties.setProperty(getKeyFor(column, HAS_FST_INDEX), String.valueOf(hasFSTIndex));
     properties.setProperty(getKeyFor(column, IS_SINGLE_VALUED), String.valueOf(fieldSpec.isSingleValueField()));
     properties.setProperty(getKeyFor(column, MAX_MULTI_VALUE_ELEMTS),
         String.valueOf(columnIndexCreationInfo.getMaxNumberOfMultiValueElements()));
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
index 0a09394..3e27a03 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
@@ -37,6 +37,7 @@ public class V1Constants {
     public static final String UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION = ".mv.fwd";
     public static final String BITMAP_INVERTED_INDEX_FILE_EXTENSION = ".bitmap.inv";
     public static final String BITMAP_RANGE_INDEX_FILE_EXTENSION = ".bitmap.range";
+    public static final String FST_INDEX_FILE_EXTENSION = ".lucene.fst";
     public static final String BLOOM_FILTER_FILE_EXTENSION = ".bloom";
     public static final String NULLVALUE_VECTOR_FILE_EXTENSION = ".bitmap.nullvalue";
   }
@@ -77,6 +78,7 @@ public class V1Constants {
       public static final String HAS_NULL_VALUE = "hasNullValue";
       public static final String HAS_DICTIONARY = "hasDictionary";
       public static final String HAS_INVERTED_INDEX = "hasInvertedIndex";
+      public static final String HAS_FST_INDEX = "hasFSTIndex";
       public static final String IS_SINGLE_VALUED = "isSingleValues";
       public static final String MAX_MULTI_VALUE_ELEMTS = "maxNumberOfMultiValues";
       public static final String TOTAL_NUMBER_OF_ENTRIES = "totalNumberOfEntries";
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/text/LuceneFSTIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/text/LuceneFSTIndexCreator.java
new file mode 100644
index 0000000..0c741ac
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/text/LuceneFSTIndexCreator.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.creator.impl.inv.text;
+
+import org.apache.lucene.store.OutputStreamDataOutput;
+import org.apache.lucene.util.fst.FST;
+import org.apache.pinot.core.segment.creator.TextIndexCreator;
+import org.apache.pinot.core.segment.creator.impl.SegmentColumnarIndexCreator;
+import org.apache.pinot.core.util.fst.FSTBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import static org.apache.pinot.core.segment.creator.impl.V1Constants.Indexes.FST_INDEX_FILE_EXTENSION;
+
+
+/**
+ * This index works only for dictionary enabled columns. It requires entries be added into this index in sorted
+ * order and it creates a mapping from sorted entry to the index underneath. This index stores key (column value)
+ * to dictionary id as an entry.
+ *
+ */
+public class LuceneFSTIndexCreator implements TextIndexCreator {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentColumnarIndexCreator.class);
+  private final File _fstIndexFile;
+  private final FSTBuilder _fstBuilder;
+  Integer _dictId;
+
+  /**
+   * This index requires values of the column be added in sorted order. Sorted entries could be passed in through
+   * constructor or added through addSortedDictIds function. Index of the sorted entry should correspond to the
+   * dictionary id.
+   *
+   * @param indexDir  Index directory
+   * @param columnName Column name for which index is being created
+   * @param sortedEntries Sorted entries of the unique values of the column.
+   * @throws IOException
+   */
+  public LuceneFSTIndexCreator(File indexDir, String columnName, String[] sortedEntries)
+      throws IOException {
+    _fstIndexFile = new File(indexDir, columnName + FST_INDEX_FILE_EXTENSION);
+
+    _fstBuilder = new FSTBuilder();
+    _dictId = 0;
+    if (sortedEntries != null) {
+      for (_dictId = 0; _dictId < sortedEntries.length; _dictId++) {
+        _fstBuilder.addEntry(sortedEntries[_dictId], _dictId);
+      }
+    }
+  }
+
+  // Expects dictionary entries in sorted order.
+  @Override
+  public void add(String document) {
+    try {
+      _fstBuilder.addEntry(document, _dictId);
+      _dictId++;
+    } catch (IOException ex) {
+      throw new RuntimeException("Unable to load the schema file", ex);
+    }
+  }
+
+  @Override
+  public void seal()
+      throws IOException {
+    LOGGER.info("Sealing FST index: " + _fstIndexFile.getAbsolutePath());
+    FileOutputStream fileOutputStream = null;
+    try {
+      fileOutputStream = new FileOutputStream(_fstIndexFile);
+      FST<Long> fst = _fstBuilder.done();
+      OutputStreamDataOutput d = new OutputStreamDataOutput(fileOutputStream);
+      fst.save(d);
+    } finally {
+      if (fileOutputStream != null) {
+        fileOutputStream.close();
+      }
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/ColumnIndexContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/ColumnIndexContainer.java
index 087f4b2..11e7b9a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/ColumnIndexContainer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/ColumnIndexContainer.java
@@ -53,6 +53,11 @@ public interface ColumnIndexContainer extends Closeable {
   TextIndexReader getTextIndex();
 
   /**
+   * Returns the fst index for the column, or {@code null} if it does not exist.
+   */
+  TextIndexReader getFSTIndex();
+
+  /**
    * Returns the dictionary for the column, or {@code null} if it does not exist.
    */
   Dictionary getDictionary();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
index 1fe955c..6de1db5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
@@ -34,6 +34,7 @@ import org.apache.pinot.core.segment.index.readers.ForwardIndexReader;
 import org.apache.pinot.core.segment.index.readers.IntDictionary;
 import org.apache.pinot.core.segment.index.readers.InvertedIndexReader;
 import org.apache.pinot.core.segment.index.readers.LongDictionary;
+import org.apache.pinot.core.segment.index.readers.LuceneFSTIndexReader;
 import org.apache.pinot.core.segment.index.readers.NullValueVectorReaderImpl;
 import org.apache.pinot.core.segment.index.readers.OnHeapDoubleDictionary;
 import org.apache.pinot.core.segment.index.readers.OnHeapFloatDictionary;
@@ -67,6 +68,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
   private final InvertedIndexReader<?> _invertedIndex;
   private final InvertedIndexReader<?> _rangeIndex;
   private final TextIndexReader _textIndex;
+  private final TextIndexReader _fstIndex;
   private final BaseImmutableDictionary _dictionary;
   private final BloomFilterReader _bloomFilter;
   private final NullValueVectorReaderImpl _nullValueVectorReader;
@@ -80,6 +82,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
     boolean loadTextIndex = indexLoadingConfig.getTextIndexColumns().contains(columnName);
     boolean loadOnHeapDictionary = indexLoadingConfig.getOnHeapDictionaryColumns().contains(columnName);
     BloomFilterConfig bloomFilterConfig = indexLoadingConfig.getBloomFilterConfigs().get(columnName);
+    boolean loadFSTIndex = indexLoadingConfig.getFSTIndexColumns().contains(columnName);
 
     if (segmentReader.hasIndexFor(columnName, ColumnIndexType.NULLVALUE_VECTOR)) {
       PinotDataBuffer nullValueVectorBuffer = segmentReader.getIndexFor(columnName, ColumnIndexType.NULLVALUE_VECTOR);
@@ -119,6 +122,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
           _forwardIndex = sortedIndexReader;
           _invertedIndex = sortedIndexReader;
           _rangeIndex = null;
+          _fstIndex = null;
           return;
         } else {
           // Unsorted
@@ -137,6 +141,15 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
       } else {
         _invertedIndex = null;
       }
+
+
+      if (loadFSTIndex) {
+        _fstIndex = new LuceneFSTIndexReader(
+            segmentReader.getIndexFor(columnName, ColumnIndexType.FST_INDEX));
+      } else {
+        _fstIndex = null;
+      }
+
       if (loadRangeIndex) {
         _rangeIndex = new RangeIndexReader(segmentReader.getIndexFor(columnName, ColumnIndexType.RANGE_INDEX));
       } else {
@@ -149,6 +162,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
       _bloomFilter = null;
       _rangeIndex = null;
       _invertedIndex = null;
+      _fstIndex = null;
     }
   }
 
@@ -183,6 +197,11 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
   }
 
   @Override
+  public TextIndexReader getFSTIndex() {
+    return _fstIndex;
+  }
+
+  @Override
   public NullValueVectorReaderImpl getNullValueVector() {
     return _nullValueVectorReader;
   }
@@ -264,5 +283,8 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
     if (_bloomFilter != null) {
       _bloomFilter.close();
     }
+    if (_fstIndex != null) {
+      _fstIndex.close();
+    }
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/converter/SegmentV1V2ToV3FormatConverter.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/converter/SegmentV1V2ToV3FormatConverter.java
index 77ffec8..aa9b18f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/converter/SegmentV1V2ToV3FormatConverter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/converter/SegmentV1V2ToV3FormatConverter.java
@@ -34,6 +34,7 @@ import org.apache.pinot.common.segment.ReadMode;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
 import org.apache.pinot.core.segment.creator.impl.V1Constants;
 import org.apache.pinot.core.segment.creator.impl.text.LuceneTextIndexCreator;
+import org.apache.pinot.core.segment.creator.impl.inv.text.LuceneFSTIndexCreator;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.core.segment.index.readers.text.LuceneTextIndexReader;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
@@ -151,12 +152,15 @@ public class SegmentV1V2ToV3FormatConverter implements SegmentFormatConverter {
           if (v2DataReader.hasIndexFor(column, ColumnIndexType.NULLVALUE_VECTOR)) {
             copyNullValueVector(v2DataReader, v3DataWriter, column);
           }
+          // Copies FST index if there is one available.
+          copyExistingFSTIndex(v2DataReader, v3DataWriter, column);
         }
 
         // inverted indexes are intentionally stored at the end of the single file
         for (String column : allColumns) {
           copyExistingInvertedIndex(v2DataReader, v3DataWriter, column);
         }
+
         v3DataWriter.saveAndClose();
       }
     }
@@ -196,6 +200,13 @@ public class SegmentV1V2ToV3FormatConverter implements SegmentFormatConverter {
     }
   }
 
+  private void copyExistingFSTIndex(SegmentDirectory.Reader reader, SegmentDirectory.Writer writer, String column)
+      throws Exception {
+    if (reader.hasIndexFor(column, ColumnIndexType.FST_INDEX)) {
+      readCopyBuffers(reader, writer, column, ColumnIndexType.FST_INDEX);
+    }
+  }
+
   private void readCopyBuffers(SegmentDirectory.Reader reader, SegmentDirectory.Writer writer, String column,
       ColumnIndexType indexType)
       throws IOException {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/BaseDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/BaseDataSource.java
index 172e8e6..eb3dcc7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/BaseDataSource.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/BaseDataSource.java
@@ -25,6 +25,7 @@ import org.apache.pinot.core.segment.index.readers.BloomFilterReader;
 import org.apache.pinot.core.segment.index.readers.Dictionary;
 import org.apache.pinot.core.segment.index.readers.ForwardIndexReader;
 import org.apache.pinot.core.segment.index.readers.InvertedIndexReader;
+import org.apache.pinot.core.segment.index.readers.LuceneFSTIndexReader;
 import org.apache.pinot.core.segment.index.readers.NullValueVectorReader;
 import org.apache.pinot.core.segment.index.readers.TextIndexReader;
 
@@ -36,19 +37,24 @@ public abstract class BaseDataSource implements DataSource {
   private final InvertedIndexReader<?> _invertedIndex;
   private final InvertedIndexReader<?> _rangeIndex;
   private final TextIndexReader _textIndex;
+  private final TextIndexReader _fstIndex;
   private final BloomFilterReader _bloomFilter;
   private final NullValueVectorReader _nullValueVector;
 
   public BaseDataSource(DataSourceMetadata dataSourceMetadata, ForwardIndexReader<?> forwardIndex,
-      @Nullable Dictionary dictionary, @Nullable InvertedIndexReader<?> invertedIndex,
-      @Nullable InvertedIndexReader<?> rangeIndex, @Nullable TextIndexReader textIndex,
-      @Nullable BloomFilterReader bloomFilter, @Nullable NullValueVectorReader nullValueVector) {
+                        @Nullable Dictionary dictionary, @Nullable InvertedIndexReader<?> invertedIndex,
+                        @Nullable InvertedIndexReader<?> rangeIndex,
+                        @Nullable TextIndexReader textIndex,
+                        @Nullable TextIndexReader fstIndex,
+                        @Nullable BloomFilterReader bloomFilter,
+                        @Nullable NullValueVectorReader nullValueVector) {
     _dataSourceMetadata = dataSourceMetadata;
     _forwardIndex = forwardIndex;
     _dictionary = dictionary;
     _invertedIndex = invertedIndex;
     _rangeIndex = rangeIndex;
     _textIndex = textIndex;
+    _fstIndex = fstIndex;
     _bloomFilter = bloomFilter;
     _nullValueVector = nullValueVector;
   }
@@ -83,6 +89,12 @@ public abstract class BaseDataSource implements DataSource {
 
   @Nullable
   @Override
+  public TextIndexReader getFSTIndex() {
+    return _fstIndex;
+  }
+
+  @Nullable
+  @Override
   public TextIndexReader getTextIndex() {
     return _textIndex;
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/ImmutableDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/ImmutableDataSource.java
index cc09adf..16a0e1c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/ImmutableDataSource.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/ImmutableDataSource.java
@@ -36,6 +36,7 @@ public class ImmutableDataSource extends BaseDataSource {
     super(new ImmutableDataSourceMetadata(columnMetadata), columnIndexContainer.getForwardIndex(),
         columnIndexContainer.getDictionary(), columnIndexContainer.getInvertedIndex(),
         columnIndexContainer.getRangeIndex(), columnIndexContainer.getTextIndex(),
+        columnIndexContainer.getFSTIndex(),
         columnIndexContainer.getBloomFilter(), columnIndexContainer.getNullValueVector());
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
index a927353..e9ae0df 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
@@ -36,16 +36,25 @@ import org.apache.pinot.spi.data.FieldSpec;
  */
 @SuppressWarnings("rawtypes")
 public class MutableDataSource extends BaseDataSource {
+  final boolean _fstIndexEnabled;
 
-  public MutableDataSource(FieldSpec fieldSpec, int numDocs, int numValues, int maxNumValuesPerMVEntry,
+  public MutableDataSource(FieldSpec fieldSpec, int numDocs, int numValues, int maxNumValuesPerMVEntry, boolean fstIndexEnabled,
       @Nullable PartitionFunction partitionFunction, @Nullable Set<Integer> partitions, @Nullable Comparable minValue,
       @Nullable Comparable maxValue, ForwardIndexReader forwardIndex, @Nullable Dictionary dictionary,
       @Nullable InvertedIndexReader invertedIndex, @Nullable InvertedIndexReader rangeIndex,
-      @Nullable TextIndexReader textIndex, @Nullable BloomFilterReader bloomFilter,
+      @Nullable TextIndexReader textIndex, @Nullable TextIndexReader fstIndex,
+      @Nullable BloomFilterReader bloomFilter,
       @Nullable NullValueVectorReader nullValueVector) {
     super(new MutableDataSourceMetadata(fieldSpec, numDocs, numValues, maxNumValuesPerMVEntry, partitionFunction,
-            partitions, minValue, maxValue), forwardIndex, dictionary, invertedIndex, rangeIndex, textIndex, bloomFilter,
-        nullValueVector);
+                    partitions, minValue, maxValue), forwardIndex, dictionary,
+            invertedIndex, rangeIndex, textIndex, fstIndex,
+            bloomFilter, nullValueVector);
+    _fstIndexEnabled = fstIndexEnabled;
+  }
+
+  // Returns whether the current field has fst index enabled.
+  public boolean hasFSTIndexEnabled() {
+    return _fstIndexEnabled;
   }
 
   private static class MutableDataSourceMetadata implements DataSourceMetadata {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
index 04d91d1..3310c73 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
@@ -48,6 +48,7 @@ public class IndexLoadingConfig {
   private List<String> _sortedColumns = Collections.emptyList();
   private Set<String> _invertedIndexColumns = new HashSet<>();
   private Set<String> _textIndexColumns = new HashSet<>();
+  private Set<String> _fstIndexColumns = new HashSet<>();
   private Set<String> _rangeIndexColumns = new HashSet<>();
   private Set<String> _noDictionaryColumns = new HashSet<>(); // TODO: replace this by _noDictionaryConfig.
   private Map<String, String> _noDictionaryConfig = new HashMap<>();
@@ -121,6 +122,7 @@ public class IndexLoadingConfig {
     }
 
     extractTextIndexColumnsFromTableConfig(tableConfig);
+    extractFSTIndexColumnsFromTableConfig(tableConfig);
 
     Map<String, String> noDictionaryConfig = indexingConfig.getNoDictionaryConfig();
     if (noDictionaryConfig != null) {
@@ -173,6 +175,18 @@ public class IndexLoadingConfig {
     }
   }
 
+  private void extractFSTIndexColumnsFromTableConfig(TableConfig tableConfig) {
+    List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
+    if (fieldConfigList != null) {
+      for (FieldConfig fieldConfig : fieldConfigList) {
+        String column = fieldConfig.getName();
+        if (fieldConfig.getIndexType() == FieldConfig.IndexType.FST) {
+          _fstIndexColumns.add(column);
+        }
+      }
+    }
+  }
+
   private void extractFromInstanceConfig(InstanceDataManagerConfig instanceDataManagerConfig) {
     ReadMode instanceReadMode = instanceDataManagerConfig.getReadMode();
     if (instanceReadMode != null) {
@@ -248,6 +262,10 @@ public class IndexLoadingConfig {
     return _textIndexColumns;
   }
 
+  public Set<String> getFSTIndexColumns() {
+    return _fstIndexColumns;
+  }
+
   /**
    * For tests only.
    */
@@ -280,6 +298,10 @@ public class IndexLoadingConfig {
     _bloomFilterConfigs = bloomFilterConfigs;
   }
 
+  public void setFSTIndexColumns(Set<String> fstIndexColumns) {
+    _fstIndexColumns = fstIndexColumns;
+  }
+
   @VisibleForTesting
   public void setOnHeapDictionaryColumns(Set<String> onHeapDictionaryColumns) {
     _onHeapDictionaryColumns = onHeapDictionaryColumns;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java
index ca7f7e9..d1fc1d9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java
@@ -31,6 +31,7 @@ import org.apache.pinot.core.segment.index.loader.columnminmaxvalue.ColumnMinMax
 import org.apache.pinot.core.segment.index.loader.defaultcolumn.DefaultColumnHandler;
 import org.apache.pinot.core.segment.index.loader.defaultcolumn.DefaultColumnHandlerFactory;
 import org.apache.pinot.core.segment.index.loader.invertedindex.InvertedIndexHandler;
+import org.apache.pinot.core.segment.index.loader.invertedindex.LuceneFSTIndexHandler;
 import org.apache.pinot.core.segment.index.loader.invertedindex.RangeIndexHandler;
 import org.apache.pinot.core.segment.index.loader.invertedindex.TextIndexHandler;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
@@ -120,6 +121,14 @@ public class SegmentPreProcessor implements AutoCloseable {
         textIndexHandler.createTextIndexesOnSegmentLoad();
       }
 
+      Set<String> fstIndexColumns = _indexLoadingConfig.getFSTIndexColumns();
+      if (fstIndexColumns.size() > 0) {
+        LuceneFSTIndexHandler luceneFSTIndexHandler =
+                new LuceneFSTIndexHandler(
+                        _indexDir, _segmentMetadata, fstIndexColumns, segmentWriter);
+        luceneFSTIndexHandler.createFSTIndexesOnSegmentLoad();
+      }
+
       // Create bloom filter if required
       BloomFilterHandler bloomFilterHandler =
           new BloomFilterHandler(_indexDir, _segmentMetadata, _indexLoadingConfig, segmentWriter);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index 1122264..f60a1ff 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -411,6 +411,7 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
     // Add the column metadata information to the metadata properties.
     SegmentColumnarIndexCreator
         .addColumnMetadataInfo(_segmentProperties, column, columnIndexCreationInfo, totalDocs, fieldSpec,
-            true/*hasDictionary*/, dictionaryElementSize, true/*hasInvertedIndex*/, TextIndexType.NONE);
+            true/*hasDictionary*/, dictionaryElementSize, true/*hasInvertedIndex*/,
+            false/*hasFSTIndex*/, TextIndexType.NONE);
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/LuceneFSTIndexHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/LuceneFSTIndexHandler.java
new file mode 100644
index 0000000..4e52007
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/LuceneFSTIndexHandler.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.segment.index.loader.invertedindex;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
+import org.apache.pinot.core.segment.creator.impl.inv.text.LuceneFSTIndexCreator;
+import org.apache.pinot.core.segment.index.loader.LoaderUtils;
+import org.apache.pinot.core.segment.index.metadata.ColumnMetadata;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.core.segment.index.readers.BaseImmutableDictionary;
+import org.apache.pinot.core.segment.index.readers.StringDictionary;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.core.segment.store.ColumnIndexType;
+import org.apache.pinot.core.segment.store.SegmentDirectory;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.pinot.core.segment.creator.impl.V1Constants.Indexes.FST_INDEX_FILE_EXTENSION;
+
+
+/**
+ * Helper class for fst indexes used by {@link org.apache.pinot.core.segment.index.loader.SegmentPreProcessor}.
+ * to create FST index for column during segment load time. Currently FST index is always
+ * created (if enabled on a column) during segment generation
+ *
+ * (1) A new segment with FST index is created/refreshed. Server loads the segment. The handler
+ * detects the existence of FST index and returns.
+ *
+ * (2) A reload is issued on an existing segment with existing FST index. The handler
+ * detects the existence of FST index and returns.
+ *
+ * (3) A reload is issued on an existing segment after FST index is enabled on an existing
+ * column. Reads the dictionary to create FST index.
+ *
+ * (4) A reload is issued on an existing segment after FST index is enabled on a newly
+ * added column. In this case, the default column handler would have taken care of adding
+ * dictionary for the new column. Read the dictionary to create FST index.
+ */
+public class LuceneFSTIndexHandler {
+  private static final Logger LOGGER = LoggerFactory.getLogger(LuceneFSTIndexHandler.class);
+
+  private final File _indexDir;
+  private final SegmentDirectory.Writer _segmentWriter;
+  private final String _segmentName;
+  private final SegmentVersion _segmentVersion;
+  private final Set<ColumnMetadata> _fstIndexColumns = new HashSet<>();
+
+  public LuceneFSTIndexHandler(File indexDir, SegmentMetadataImpl segmentMetadata, Set<String> fstIndexColumns,
+      SegmentDirectory.Writer segmentWriter) {
+    _indexDir = indexDir;
+    _segmentWriter = segmentWriter;
+    _segmentName = segmentMetadata.getName();
+    _segmentVersion = SegmentVersion.valueOf(segmentMetadata.getVersion());
+
+    for (String column : fstIndexColumns) {
+      ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column);
+      if (columnMetadata != null) {
+        _fstIndexColumns.add(columnMetadata);
+      }
+    }
+  }
+
+  private void checkUnsupportedOperationsForFSTIndex(ColumnMetadata columnMetadata) {
+    String column = columnMetadata.getColumnName();
+    if (columnMetadata.getDataType() != FieldSpec.DataType.STRING) {
+      throw new UnsupportedOperationException("FST index is currently only supported on STRING columns: " + column);
+    }
+
+    if (!columnMetadata.hasDictionary()) {
+      throw new UnsupportedOperationException(
+          "FST index is currently only supported on dictionary encoded columns: " + column);
+    }
+
+    if (!columnMetadata.isSingleValue()) {
+      throw new UnsupportedOperationException("FST index is currently not supported on multi-value columns: " + column);
+    }
+  }
+
+  public void createFSTIndexesOnSegmentLoad()
+      throws Exception {
+    for (ColumnMetadata columnMetadata : _fstIndexColumns) {
+      checkUnsupportedOperationsForFSTIndex(columnMetadata);
+      createFSTIndexForColumn(columnMetadata);
+    }
+  }
+
+  private BaseImmutableDictionary getDictionaryReader(ColumnMetadata columnMetadata)
+      throws IOException {
+    PinotDataBuffer dictionaryBuffer =
+        _segmentWriter.getIndexFor(columnMetadata.getColumnName(), ColumnIndexType.DICTIONARY);
+    return new StringDictionary(dictionaryBuffer, columnMetadata.getCardinality(), columnMetadata.getColumnMaxLength(),
+        (byte) columnMetadata.getPaddingCharacter());
+  }
+
+  private void createFSTIndexForColumn(ColumnMetadata columnMetadata)
+      throws IOException {
+    String column = columnMetadata.getColumnName();
+    File inProgress = new File(_indexDir, column + ".fst.inprogress");
+    File fstIndexFile = new File(_indexDir, column + FST_INDEX_FILE_EXTENSION);
+
+    if (!inProgress.exists()) {
+      if (_segmentWriter.hasIndexFor(column, ColumnIndexType.FST_INDEX)) {
+        // Skip creating fst index if already exists.
+        LOGGER.info("Found fst index for column: {}, in segment: {}", column, _segmentName);
+        return;
+      }
+
+      // Create a marker file.
+      FileUtils.touch(inProgress);
+    } else {
+      FileUtils.deleteQuietly(fstIndexFile);
+    }
+
+    LOGGER.info("Creating new FST index for column: {} in segment: {}, cardinality: {}", column, _segmentName,
+        columnMetadata.getCardinality());
+    LuceneFSTIndexCreator luceneFSTIndexCreator = new LuceneFSTIndexCreator(_indexDir, column, null);
+    try (BaseImmutableDictionary dictionary = getDictionaryReader(columnMetadata);) {
+      for (int dictId = 0; dictId < dictionary.length(); dictId++) {
+        luceneFSTIndexCreator.add(dictionary.getStringValue(dictId));
+      }
+    }
+    luceneFSTIndexCreator.seal();
+
+    // For v3, write the generated range index file into the single file and remove it.
+    if (_segmentVersion == SegmentVersion.v3) {
+      LoaderUtils.writeIndexToV3Format(_segmentWriter, column, fstIndexFile, ColumnIndexType.FST_INDEX);
+    }
+
+    // Delete the marker file.
+    FileUtils.deleteQuietly(inProgress);
+    LOGGER.info("Created FST index for segment: {}, column: {}", _segmentName, column);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/ColumnMetadata.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/ColumnMetadata.java
index d8bf349..43990ab 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/ColumnMetadata.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/ColumnMetadata.java
@@ -64,6 +64,7 @@ public class ColumnMetadata {
   private final boolean hasDictionary;
   @JsonProperty
   private final boolean hasInvertedIndex;
+  private final boolean hasFSTIndex;
   private final boolean isSingleValue;
   private final boolean isVirtual;
   private final int maxNumberOfMultiValues;
@@ -97,6 +98,7 @@ public class ColumnMetadata {
     builder.setContainsNulls(config.getBoolean(getKeyFor(column, HAS_NULL_VALUE)));
     builder.setHasDictionary(config.getBoolean(getKeyFor(column, HAS_DICTIONARY), true));
     builder.setHasInvertedIndex(config.getBoolean(getKeyFor(column, HAS_INVERTED_INDEX)));
+    builder.setHasFSTIndex(config.getBoolean(getKeyFor(column, HAS_INVERTED_INDEX), false));
     builder.setSingleValue(config.getBoolean(getKeyFor(column, IS_SINGLE_VALUED)));
     builder.setMaxNumberOfMultiValues(config.getInt(getKeyFor(column, MAX_MULTI_VALUE_ELEMTS)));
     builder.setTotalNumberOfEntries(config.getInt(getKeyFor(column, TOTAL_NUMBER_OF_ENTRIES)));
@@ -198,6 +200,7 @@ public class ColumnMetadata {
     private boolean containsNulls;
     private boolean hasDictionary;
     private boolean hasInvertedIndex;
+    private boolean hasFSTIndex;
     private boolean isSingleValue;
     private boolean isVirtual;
     private int maxNumberOfMultiValues;
@@ -265,6 +268,11 @@ public class ColumnMetadata {
       return this;
     }
 
+    public Builder setHasFSTIndex(boolean hasFSTIndex) {
+      this.hasFSTIndex = hasFSTIndex;
+      return this;
+    }
+
     public Builder setHasInvertedIndex(boolean hasInvertedIndex) {
       this.hasInvertedIndex = hasInvertedIndex;
       return this;
@@ -354,7 +362,7 @@ public class ColumnMetadata {
           fieldType, isSorted, containsNulls, hasDictionary, hasInvertedIndex, isSingleValue, maxNumberOfMultiValues,
           totalNumberOfEntries, isAutoGenerated, isVirtual, defaultNullValueString, timeUnit, paddingCharacter,
           minValue, maxValue, partitionFunction, numPartitions, _partitions, dateTimeFormat, dateTimeGranularity,
-          TextIndexType.valueOf(textIndexType));
+          hasFSTIndex, TextIndexType.valueOf(textIndexType));
     }
   }
 
@@ -364,7 +372,7 @@ public class ColumnMetadata {
       boolean isAutoGenerated, boolean isVirtual, String defaultNullValueString, TimeUnit timeUnit,
       char paddingCharacter, Comparable minValue, Comparable maxValue, PartitionFunction partitionFunction,
       int numPartitions, Set<Integer> partitions, String dateTimeFormat, String dateTimeGranularity,
-      TextIndexType textIndexType) {
+      boolean hasFSTIndex, TextIndexType textIndexType) {
     this.columnName = columnName;
     this.cardinality = cardinality;
     this.totalDocs = totalDocs;
@@ -376,6 +384,7 @@ public class ColumnMetadata {
     this.containsNulls = hasNulls;
     this.hasDictionary = hasDictionary;
     this.hasInvertedIndex = hasInvertedIndex;
+    this.hasFSTIndex = hasFSTIndex;
     this.isSingleValue = isSingleValue;
     this.maxNumberOfMultiValues = maxNumberOfMultiValues;
     this.totalNumberOfEntries = totalNumberOfEntries;
@@ -461,6 +470,10 @@ public class ColumnMetadata {
     return hasInvertedIndex;
   }
 
+  public boolean hasFSTIndex() {
+    return hasFSTIndex;
+  }
+
   public boolean isSingleValue() {
     return isSingleValue;
   }
@@ -572,7 +585,8 @@ public class ColumnMetadata {
           .getPartitionFunction()) && getNumPartitions() == columnMetadata.getNumPartitions()
           && getPartitions() == (columnMetadata.getPartitions()) && getDateTimeFormat() == (columnMetadata
           .getDateTimeFormat()) && getDateTimeGranularity() == (columnMetadata.getDateTimeGranularity())
-          && getTextIndexType().equals(columnMetadata.getTextIndexType());
+          && hasFSTIndex() == columnMetadata.hasFSTIndex() && getTextIndexType()
+          .equals(columnMetadata.getTextIndexType());
     }
     return false;
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/LuceneFSTIndexReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/LuceneFSTIndexReader.java
new file mode 100644
index 0000000..575a04b
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/LuceneFSTIndexReader.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.readers;
+
+import org.apache.lucene.util.fst.FST;
+import org.apache.lucene.util.fst.OffHeapFSTStore;
+import org.apache.lucene.util.fst.PositiveIntOutputs;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.core.util.fst.RegexpMatcher;
+import org.apache.pinot.core.util.fst.PinotBufferIndexInput;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+
+/**
+ * This class loads FST index from PinotDataBuffer and creates a FST reader which
+ * is used in finding matching results for regexp queries. Since FST index currently
+ * stores dict ids as values this class only implements getDictIds method.
+ *
+ */
+public class LuceneFSTIndexReader implements TextIndexReader {
+  public static final Logger LOGGER = LoggerFactory.getLogger(LuceneFSTIndexReader.class);
+
+  private final PinotDataBuffer _dataBuffer;
+  private final PinotBufferIndexInput _dataBufferIndexInput;
+  private final FST<Long> _readFST;
+
+  public LuceneFSTIndexReader(PinotDataBuffer pinotDataBuffer)
+      throws IOException {
+    this._dataBuffer = pinotDataBuffer;
+    this._dataBufferIndexInput = new PinotBufferIndexInput(this._dataBuffer, 0L, this._dataBuffer.size());
+
+    this._readFST = new FST(this._dataBufferIndexInput, PositiveIntOutputs.getSingleton(), new OffHeapFSTStore());
+  }
+
+  @Override
+  public MutableRoaringBitmap getDocIds(String searchQuery) {
+    throw new RuntimeException("LuceneFSTIndexReader only supports getDictIds currently.");
+  }
+
+  @Override
+  public ImmutableRoaringBitmap getDictIds(String searchQuery) {
+    try {
+      MutableRoaringBitmap dictIds = new MutableRoaringBitmap();
+      List<Long> matchingIds = RegexpMatcher.regexMatch(searchQuery, this._readFST);
+      for (Long matchingId : matchingIds) {
+        dictIds.add(matchingId.intValue());
+      }
+      return dictIds.toImmutableRoaringBitmap();
+    } catch (Exception ex) {
+      LOGGER.error("Error getting matching Ids from FST", ex);
+      throw new RuntimeException(ex);
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    // Do Nothing
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/TextIndexReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/TextIndexReader.java
index adda14b..17dcf78 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/TextIndexReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/TextIndexReader.java
@@ -19,12 +19,20 @@
 package org.apache.pinot.core.segment.index.readers;
 
 import java.io.Closeable;
+
+import org.apache.commons.lang.mutable.Mutable;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
 
 public interface TextIndexReader extends Closeable {
 
   /**
+   * Returns the matching dictionary ids for the given search query (optional).
+   */
+  ImmutableRoaringBitmap getDictIds(String searchQuery);
+
+  /**
    * Returns the matching document ids for the given search query.
    */
   MutableRoaringBitmap getDocIds(String searchQuery);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/text/LuceneTextIndexReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/text/LuceneTextIndexReader.java
index 38a6025..ad55a7d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/text/LuceneTextIndexReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/text/LuceneTextIndexReader.java
@@ -39,6 +39,7 @@ import org.apache.pinot.core.segment.index.readers.TextIndexReader;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
 import org.apache.pinot.core.segment.store.SegmentDirectoryPaths;
 import org.apache.pinot.spi.config.table.FieldConfig;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 import org.slf4j.LoggerFactory;
 
@@ -122,6 +123,11 @@ public class LuceneTextIndexReader implements TextIndexReader {
   }
 
   @Override
+  public ImmutableRoaringBitmap getDictIds(String searchQuery) {
+    throw new UnsupportedOperationException("");
+  }
+
+  @Override
   public MutableRoaringBitmap getDocIds(String searchQuery) {
     MutableRoaringBitmap docIds = new MutableRoaringBitmap();
     Collector docIDCollector = new LuceneDocIdCollector(docIds, _docIdTranslator);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java
index dcd21df..7f11ecf 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java
@@ -25,6 +25,7 @@ public enum ColumnIndexType {
   BLOOM_FILTER("bloom_filter"),
   NULLVALUE_VECTOR("nullvalue_vector"),
   TEXT_INDEX("text_index"),
+  FST_INDEX("fst_index"),
   RANGE_INDEX("range_index");
 
   private final String indexName;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java
index 51dd2fb..7be4a8b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java
@@ -27,11 +27,15 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.pinot.common.segment.ReadMode;
 import org.apache.pinot.core.segment.creator.impl.text.LuceneTextIndexCreator;
+import org.apache.pinot.core.segment.creator.impl.inv.text.LuceneFSTIndexCreator;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.core.segment.index.readers.LuceneFSTIndexReader;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.core.segment.creator.impl.V1Constants.Indexes.FST_INDEX_FILE_EXTENSION;
+
 
 class FilePerIndexDirectory extends ColumnIndexDirectory {
   private static Logger LOGGER = LoggerFactory.getLogger(FilePerIndexDirectory.class);
@@ -135,6 +139,9 @@ class FilePerIndexDirectory extends ColumnIndexDirectory {
       case TEXT_INDEX:
         filename = column + LuceneTextIndexCreator.LUCENE_TEXT_INDEX_FILE_EXTENSION;
         break;
+      case FST_INDEX:
+        filename = column + FST_INDEX_FILE_EXTENSION;
+        break;
       default:
         throw new UnsupportedOperationException("Unknown index type: " + indexType.toString());
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SegmentDirectoryPaths.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SegmentDirectoryPaths.java
index a062fee..71fa77a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SegmentDirectoryPaths.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SegmentDirectoryPaths.java
@@ -25,8 +25,11 @@ import javax.annotation.Nullable;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
 import org.apache.pinot.core.segment.creator.impl.V1Constants;
 import org.apache.pinot.core.segment.creator.impl.text.LuceneTextIndexCreator;
+import org.apache.pinot.core.segment.creator.impl.inv.text.LuceneFSTIndexCreator;
 import org.apache.pinot.core.segment.index.readers.text.LuceneTextIndexReader;
 
+import static org.apache.pinot.core.segment.creator.impl.V1Constants.Indexes.FST_INDEX_FILE_EXTENSION;
+
 
 public class SegmentDirectoryPaths {
   private SegmentDirectoryPaths() {
@@ -85,6 +88,12 @@ public class SegmentDirectoryPaths {
     return findFormatFile(indexDir, luceneIndexDirectory);
   }
 
+  public static File findFSTIndexIndexFile(File indexDir, String column) {
+    String luceneIndexDirectory = column + FST_INDEX_FILE_EXTENSION;
+    return findFormatFile(indexDir, luceneIndexDirectory);
+  }
+
+
   @Nullable
   @VisibleForTesting
   public static File findTextIndexDocIdMappingFile(File indexDir, String column) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SingleFileIndexDirectory.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SingleFileIndexDirectory.java
index 5aebed0..6ba81f8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SingleFileIndexDirectory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/SingleFileIndexDirectory.java
@@ -36,12 +36,15 @@ import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.pinot.common.segment.ReadMode;
 import org.apache.pinot.core.segment.creator.impl.text.LuceneTextIndexCreator;
+import org.apache.pinot.core.segment.creator.impl.inv.text.LuceneFSTIndexCreator;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
 import org.apache.pinot.spi.env.CommonsConfigurationUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.core.segment.creator.impl.V1Constants.Indexes.FST_INDEX_FILE_EXTENSION;
+
 
 // There are a couple of un-addressed issues right now
 //
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java
index dc67771..e6d2fe5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java
@@ -64,6 +64,11 @@ public class VirtualColumnIndexContainer implements ColumnIndexContainer {
   }
 
   @Override
+  public TextIndexReader getFSTIndex() {
+    return null;
+  }
+
+  @Override
   public Dictionary getDictionary() {
     return _dictionary;
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeDataSource.java
index c24fb04..809eb35 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeDataSource.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeDataSource.java
@@ -32,7 +32,9 @@ public class StarTreeDataSource extends BaseDataSource {
 
   public StarTreeDataSource(FieldSpec fieldSpec, int numDocs, ForwardIndexReader<?> forwardIndex,
       @Nullable Dictionary dictionary) {
-    super(new StarTreeDataSourceMetadata(fieldSpec, numDocs), forwardIndex, dictionary, null, null, null, null, null);
+    super(new StarTreeDataSourceMetadata(fieldSpec, numDocs),
+            forwardIndex, dictionary, null, null,
+            null, null, null, null);
   }
 
   private static final class StarTreeDataSourceMetadata implements DataSourceMetadata {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
index 8854d30..15519b5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
@@ -86,7 +86,7 @@ public final class TableConfigUtils {
     validateIngestionConfig(tableConfig, schema);
     validateTierConfigList(tableConfig.getTierConfigsList());
     validateIndexingConfig(tableConfig.getIndexingConfig(), schema);
-    validateFieldConfigList(tableConfig.getFieldConfigList(), schema);
+    validateFieldConfigList(tableConfig.getFieldConfigList(), tableConfig.getIndexingConfig(), schema);
     validateUpsertConfig(tableConfig, schema);
   }
 
@@ -480,7 +480,8 @@ public final class TableConfigUtils {
    * Validates the Field Config List in the given TableConfig
    * Ensures that every referred column name exists in the corresponding schema
    */
-  private static void validateFieldConfigList(@Nullable List<FieldConfig> fieldConfigList, @Nullable Schema schema) {
+  private static void validateFieldConfigList(@Nullable List<FieldConfig> fieldConfigList,
+      @Nullable IndexingConfig indexingConfigs, @Nullable Schema schema) {
     if (fieldConfigList == null || schema == null) {
       return;
     }
@@ -489,6 +490,17 @@ public final class TableConfigUtils {
       String columnName = fieldConfig.getName();
       Preconditions.checkState(schema.getFieldSpecFor(columnName) != null,
           "Column Name " + columnName + " defined in field config list must be a valid column defined in the schema");
+
+      if (fieldConfig.getEncodingType() == FieldConfig.EncodingType.DICTIONARY) {
+        Preconditions.checkArgument(!indexingConfigs.getNoDictionaryColumns().contains(columnName),
+            "FieldConfig encoding type is different from indexingConfig for column: " + columnName);
+      }
+
+      // FST Index is only available on dictionary encoded columns.
+      if (fieldConfig.getIndexType() == FieldConfig.IndexType.FST) {
+        Preconditions.checkArgument(fieldConfig.getEncodingType() == FieldConfig.EncodingType.DICTIONARY,
+            "FST Index is only enabled on dictionary encoded columns");
+      }
     }
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/fst/FSTBuilder.java b/pinot-core/src/main/java/org/apache/pinot/core/util/fst/FSTBuilder.java
new file mode 100644
index 0000000..18fff3f
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/fst/FSTBuilder.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.util.fst;
+
+import org.apache.lucene.util.IntsRefBuilder;
+import org.apache.lucene.util.fst.Builder;
+import org.apache.lucene.util.fst.FST;
+import org.apache.lucene.util.fst.PositiveIntOutputs;
+import org.apache.lucene.util.fst.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.SortedMap;
+
+
+/**
+ *  Builds FST using lucene org.apache.lucene.util.fst.Builder library. FSTBuilder requires all the key/values
+ *  be added in sorted order.
+ */
+public class FSTBuilder {
+  public static final Logger LOGGER = LoggerFactory.getLogger(FSTBuilder.class);
+  private Builder<Long> _builder = new Builder<>(FST.INPUT_TYPE.BYTE4, PositiveIntOutputs.getSingleton());
+  private IntsRefBuilder _scratch = new IntsRefBuilder();
+
+  public static FST buildFST(SortedMap<String, Integer> input)
+      throws IOException {
+    PositiveIntOutputs fstOutput = PositiveIntOutputs.getSingleton();
+    Builder<Long> builder = new Builder<Long>(FST.INPUT_TYPE.BYTE4, fstOutput);
+
+    IntsRefBuilder scratch = new IntsRefBuilder();
+    for (Map.Entry<String, Integer> entry : input.entrySet()) {
+      builder.add(Util.toUTF16(entry.getKey(), scratch), entry.getValue().longValue());
+    }
+    FST<Long> result = builder.finish();
+    return result;
+  }
+
+  public void addEntry(String key, Integer value)
+      throws IOException {
+    _builder.add(Util.toUTF16(key, _scratch), value.longValue());
+  }
+
+  public FST done()
+      throws IOException {
+    return _builder.finish();
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/fst/PinotBufferIndexInput.java b/pinot-core/src/main/java/org/apache/pinot/core/util/fst/PinotBufferIndexInput.java
new file mode 100644
index 0000000..dcb8534
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/fst/PinotBufferIndexInput.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.util.fst;
+
+import org.apache.lucene.store.IndexInput;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+
+import java.io.IOException;
+
+
+/**
+ * PinotBufferIndexInput is a wrapper around PinotDataBuffer implementing IndexInput apis.
+ * It acts as a bridge between lucene FST reader library and PinotDataBuffer, it lets FST index
+ * load into PinotDataBuffer and helps in utilizing FST reader (org.apache.lucene.util.fst.FST).
+ *
+ */
+public class PinotBufferIndexInput extends IndexInput {
+    PinotDataBuffer pinotDataBuffer;
+    Long sliceOffset;
+    Long readPointerOffset;
+    Long length;
+
+    public PinotBufferIndexInput(
+            PinotDataBuffer pinotDataBuffer,
+            Long offset, Long length) {
+        super("");
+        this.pinotDataBuffer = pinotDataBuffer;
+        this.sliceOffset = offset;
+        this.readPointerOffset = offset;
+        this.length = length;
+    }
+
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public long getFilePointer() {
+        return this.readPointerOffset;
+    }
+
+    @Override
+    public void seek(long l) throws IOException {
+        this.readPointerOffset = this.sliceOffset + l;
+    }
+
+    @Override
+    public long length() {
+        return this.length;
+    }
+
+    @Override
+    public IndexInput slice(String s, long l, long l1) throws IOException {
+        return new PinotBufferIndexInput(
+                this.pinotDataBuffer, this.sliceOffset + l, l1);
+    }
+
+    @Override
+    public byte readByte() throws IOException {
+        Byte b = this.pinotDataBuffer.getByte(this.readPointerOffset);
+        this.readPointerOffset += 1;
+        return b;
+    }
+
+    @Override
+    public void readBytes(byte[] bytes, int destOffset, int length) throws IOException {
+        for (int i = 0; i < length; i++) {
+            bytes[destOffset] = readByte();
+            destOffset += 1;
+        }
+    }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/fst/RegexpMatcher.java b/pinot-core/src/main/java/org/apache/pinot/core/util/fst/RegexpMatcher.java
new file mode 100644
index 0000000..76447ad
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/fst/RegexpMatcher.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.util.fst;
+
+import org.apache.lucene.util.IntsRefBuilder;
+import org.apache.lucene.util.automaton.Automaton;
+import org.apache.lucene.util.automaton.CharacterRunAutomaton;
+import org.apache.lucene.util.automaton.RegExp;
+import org.apache.lucene.util.automaton.Transition;
+import org.apache.lucene.util.fst.FST;
+import org.apache.lucene.util.fst.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+
+/**
+ * RegexpMatcher is a helper to retrieve matching values for a given regexp query.
+ * Regexp query is converted into an automaton and we run the matching algorithm on FST.
+ *
+ * Two main functions of this class are
+ *   regexMatchOnFST() Function runs matching on FST (See function comments for more details)
+ *   match(input) Function builds the automaton and matches given input.
+ */
+public class RegexpMatcher {
+  public static final Logger LOGGER = LoggerFactory.getLogger(FSTBuilder.class);
+
+  private final String _regexQuery;
+  private final FST<Long> _fst;
+  private final Automaton _automaton;
+
+  public RegexpMatcher(String regexQuery, FST<Long> fst) {
+    _regexQuery = regexQuery;
+    _fst = fst;
+    _automaton = (new RegExp(_regexQuery)).toAutomaton();
+  }
+
+  public static List<Long> regexMatch(String regexQuery, FST<Long> fst)
+      throws IOException {
+    RegexpMatcher matcher = new RegexpMatcher(regexQuery, fst);
+    return matcher.regexMatchOnFST();
+  }
+
+  // Matches "input" string with _regexQuery Automaton.
+  public boolean match(String input) {
+    CharacterRunAutomaton characterRunAutomaton = new CharacterRunAutomaton(_automaton);
+    return characterRunAutomaton.run(input);
+  }
+
+  /**
+   * This function runs matching on automaton built from regexQuery and the FST.
+   * FST stores key (string) to a value (Long). Both are state machines and state transition is based on
+   * a input character.
+   *
+   * This algorithm starts with Queue containing (Automaton Start Node, FST Start Node).
+   * Each step an entry is popped from the queue:
+   *    1) if the automaton state is accept and the FST Node is final (i.e. end node) then the value stored for that FST
+   *       is added to the set of result.
+   *    2) Else next set of transitions on automaton are gathered and for each transition target node for that character
+   *       is figured out in FST Node, resulting pair of (automaton state, fst node) are added to the queue.
+   *    3) This process is bound to complete since we are making progression on the FST (which is a DAG) towards final
+   *       nodes.
+   * @return
+   * @throws IOException
+   */
+  public List<Long> regexMatchOnFST()
+      throws IOException {
+    final List<Path<Long>> queue = new ArrayList<>();
+    final List<Path<Long>> endNodes = new ArrayList<>();
+    if (_automaton.getNumStates() == 0) {
+      return Collections.emptyList();
+    }
+
+    // Automaton start state and FST start node is added to the queue.
+    queue.add(new Path<>(0, _fst.getFirstArc(new FST.Arc<Long>()), _fst.outputs.getNoOutput(), new IntsRefBuilder()));
+
+    final FST.Arc<Long> scratchArc = new FST.Arc<>();
+    final FST.BytesReader fstReader = _fst.getBytesReader();
+
+    Transition t = new Transition();
+    while (queue.size() != 0) {
+      final Path<Long> path = queue.remove(queue.size() - 1);
+
+      // If automaton is in accept state and the fstNode is final (i.e. end node) then add the entry to endNodes which
+      // contains the result set.
+      if (_automaton.isAccept(path.state)) {
+        if (path.fstNode.isFinal()) {
+          endNodes.add(path);
+        }
+      }
+
+      // Gather next set of transitions on automaton and find target nodes in FST.
+      IntsRefBuilder currentInput = path.input;
+      int count = _automaton.initTransition(path.state, t);
+      for (int i = 0; i < count; i++) {
+        _automaton.getNextTransition(t);
+        final int min = t.min;
+        final int max = t.max;
+        if (min == max) {
+          final FST.Arc<Long> nextArc = _fst.findTargetArc(t.min, path.fstNode, scratchArc, fstReader);
+          if (nextArc != null) {
+            final IntsRefBuilder newInput = new IntsRefBuilder();
+            newInput.copyInts(currentInput.get());
+            newInput.append(t.min);
+            queue.add(new Path<Long>(t.dest, new FST.Arc<Long>().copyFrom(nextArc),
+                _fst.outputs.add(path.output, nextArc.output), newInput));
+          }
+        } else {
+          FST.Arc<Long> nextArc = Util.readCeilArc(min, _fst, path.fstNode, scratchArc, fstReader);
+          while (nextArc != null && nextArc.label <= max) {
+            final IntsRefBuilder newInput = new IntsRefBuilder();
+            newInput.copyInts(currentInput.get());
+            newInput.append(nextArc.label);
+            queue.add(
+                new Path<>(t.dest, new FST.Arc<Long>().copyFrom(nextArc), _fst.outputs.add(path.output, nextArc.output),
+                    newInput));
+            nextArc = nextArc.isLast() ? null : _fst.readNextRealArc(nextArc, fstReader);
+          }
+        }
+      }
+    }
+
+    // From the result set of matched entries gather the values stored and return.
+    ArrayList<Long> matchedIds = new ArrayList<>();
+    for (Path<Long> path : endNodes) {
+      matchedIds.add(path.output);
+    }
+    return matchedIds;
+  }
+
+  public static final class Path<T> {
+    public final int state;
+    public final FST.Arc<T> fstNode;
+    public final T output;
+    public final IntsRefBuilder input;
+
+    public Path(int state, FST.Arc<T> fstNode, T output, IntsRefBuilder input) {
+      this.state = state;
+      this.fstNode = fstNode;
+      this.output = output;
+      this.input = input;
+    }
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/LuceneFSTIndexCreatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/LuceneFSTIndexCreatorTest.java
new file mode 100644
index 0000000..f602592
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/LuceneFSTIndexCreatorTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.creator;
+
+import java.nio.ByteOrder;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.segment.creator.impl.inv.text.LuceneFSTIndexCreator;
+import org.apache.pinot.core.segment.index.readers.LuceneFSTIndexReader;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.apache.pinot.core.segment.creator.impl.V1Constants.Indexes.FST_INDEX_FILE_EXTENSION;
+
+
+public class LuceneFSTIndexCreatorTest {
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "LuceneFSTIndex");
+
+  @BeforeClass
+  public void setUp()
+      throws IOException {
+    FileUtils.forceMkdir(INDEX_DIR);
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    FileUtils.deleteDirectory(INDEX_DIR);
+  }
+
+  @Test
+  public void testIndexWriterReader()
+      throws IOException {
+    String[] uniqueValues = new String[3];
+    uniqueValues[0] = "hello-world";
+    uniqueValues[1] = "hello-world123";
+    uniqueValues[2] = "still";
+
+    LuceneFSTIndexCreator creator = new LuceneFSTIndexCreator(INDEX_DIR, "testFSTColumn", uniqueValues);
+    creator.seal();
+    File fstFile = new File(INDEX_DIR, "testFSTColumn" + FST_INDEX_FILE_EXTENSION);
+    PinotDataBuffer pinotDataBuffer =
+        PinotDataBuffer.mapFile(fstFile, true, 0, fstFile.length(), ByteOrder.BIG_ENDIAN, "fstIndexFile");
+    LuceneFSTIndexReader reader = new LuceneFSTIndexReader(pinotDataBuffer);
+    int[] matchedDictIds = reader.getDictIds("hello.*").toArray();
+    Assert.assertEquals(2, matchedDictIds.length);
+    Assert.assertEquals(0, matchedDictIds[0]);
+    Assert.assertEquals(1, matchedDictIds[1]);
+
+    matchedDictIds = reader.getDictIds(".*llo").toArray();
+    Assert.assertEquals(0, matchedDictIds.length);
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/loader/LoaderTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/loader/LoaderTest.java
index 32a4c0a..36678a4 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/loader/LoaderTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/loader/LoaderTest.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import org.apache.commons.io.FileUtils;
+import org.apache.lucene.util.fst.FST;
 import org.apache.pinot.common.segment.ReadMode;
 import org.apache.pinot.common.utils.CommonConstants.Segment.BuiltInVirtualColumn;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
@@ -34,10 +35,12 @@ import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
 import org.apache.pinot.core.segment.creator.impl.SegmentCreationDriverFactory;
 import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.segment.creator.impl.inv.text.LuceneFSTIndexCreator;
 import org.apache.pinot.core.segment.creator.impl.text.LuceneTextIndexCreator;
 import org.apache.pinot.core.segment.index.converter.SegmentV1V2ToV3FormatConverter;
 import org.apache.pinot.core.segment.index.metadata.ColumnMetadata;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.core.segment.index.readers.LuceneFSTIndexReader;
 import org.apache.pinot.core.segment.index.readers.StringDictionary;
 import org.apache.pinot.core.segment.index.readers.text.LuceneTextIndexReader;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
@@ -57,6 +60,8 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 import org.testng.collections.Lists;
 
+import static org.apache.pinot.core.segment.creator.impl.V1Constants.Indexes.FST_INDEX_FILE_EXTENSION;
+
 
 public class LoaderTest {
   private static final File INDEX_DIR = new File(LoaderTest.class.getName());
@@ -66,6 +71,7 @@ public class LoaderTest {
   private static final String PADDING_NULL = "data/paddingNull.tar.gz";
 
   private static final String TEXT_INDEX_COL_NAME = "column5";
+  private static final String FST_INDEX_COL_NAME = "column5";
 
   private File _avroFile;
   private File _indexDir;
@@ -275,6 +281,139 @@ public class LoaderTest {
     indexSegment.destroy();
   }
 
+  private void constructSegmentWithFSTIndex(SegmentVersion segmentVersion)
+      throws Exception {
+    FileUtils.deleteQuietly(INDEX_DIR);
+    SegmentGeneratorConfig segmentGeneratorConfig =
+        SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(_avroFile, INDEX_DIR, "testTable");
+    SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
+    List<String> fstIndexCreationColumns = Lists.newArrayList(FST_INDEX_COL_NAME);
+    segmentGeneratorConfig.setFSTIndexCreationColumns(fstIndexCreationColumns);
+    segmentGeneratorConfig.setSegmentVersion(segmentVersion);
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+
+    _indexDir = new File(INDEX_DIR, driver.getSegmentName());
+  }
+
+  @Test
+  public void testFSTIndexLoad()
+      throws Exception {
+    constructSegmentWithFSTIndex(SegmentVersion.v3);
+    Assert.assertEquals(new SegmentMetadataImpl(_indexDir).getSegmentVersion(), SegmentVersion.v3);
+    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
+    verifyIndexDirIsV3(_indexDir);
+    File fstIndexFile = SegmentDirectoryPaths.findFSTIndexIndexFile(_indexDir, FST_INDEX_COL_NAME);
+    Assert.assertNull(fstIndexFile);
+
+
+    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
+    indexLoadingConfig.setFSTIndexColumns(new HashSet<>(Arrays.asList(FST_INDEX_COL_NAME)));
+    indexLoadingConfig.setReadMode(ReadMode.mmap);
+    IndexSegment indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
+    // check that loaded segment version is v3
+    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v3.toString());
+    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
+    // check that index dir is not in V1 format (the only subdir it should have is V3)
+    verifyIndexDirIsV3(_indexDir);
+
+    SegmentDirectory segmentDir = SegmentDirectory.createFromLocalFS(_indexDir, ReadMode.heap);
+    SegmentDirectory.Reader reader = segmentDir.createReader();
+    Assert.assertNotNull(reader);
+    Assert.assertTrue(reader.hasIndexFor(FST_INDEX_COL_NAME, ColumnIndexType.FST_INDEX));
+    indexSegment.destroy();
+
+    // CASE 2: set the segment version to load in IndexLoadingConfig as V3
+    // there should be no conversion done by ImmutableSegmentLoader since the segmentVersionToLoad
+    // is same as the version of segment on disk (V3)
+    indexLoadingConfig.setSegmentVersion(SegmentVersion.v3);
+    indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
+    // check that loaded segment version is v3
+    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v3.toString());
+    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
+    // check that index dir is not in V1 format (the only subdir it should have is V3)
+    verifyIndexDirIsV3(_indexDir);
+    segmentDir = SegmentDirectory.createFromLocalFS(_indexDir, ReadMode.heap);
+    reader = segmentDir.createReader();
+    Assert.assertNotNull(reader);
+    Assert.assertTrue(reader.hasIndexFor(FST_INDEX_COL_NAME, ColumnIndexType.FST_INDEX));
+    indexSegment.destroy();
+
+    // Test for scenarios by creating on-disk segment in V1 and then loading
+    // the segment with and without specifying segmentVersion in IndexLoadingConfig
+
+    // create on-disk segment in V1
+    // this generates the segment in V1 and does not convert to V3 as part of post-creation processing
+    constructSegmentWithFSTIndex(SegmentVersion.v1);
+
+    // check that segment on-disk version is V1 after creation
+    Assert.assertEquals(new SegmentMetadataImpl(_indexDir).getSegmentVersion(), SegmentVersion.v1);
+    // check that segment v1 dir exists
+    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v1).exists());
+    // check that v3 index sub-dir does not exist
+    Assert.assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
+    fstIndexFile = SegmentDirectoryPaths.findFSTIndexIndexFile(_indexDir, FST_INDEX_COL_NAME);
+    Assert.assertNotNull(fstIndexFile);
+    Assert.assertFalse(fstIndexFile.isDirectory());
+    Assert.assertEquals(fstIndexFile.getName(), FST_INDEX_COL_NAME + FST_INDEX_FILE_EXTENSION);
+    Assert.assertEquals(fstIndexFile.getParentFile().getName(), new SegmentMetadataImpl(_indexDir).getName());
+    indexSegment.destroy();
+
+    // CASE 1: don't set the segment version to load in IndexLoadingConfig
+    // there should be no conversion done by ImmutableSegmentLoader and it should
+    // be able to create fst index reader with on-disk version V1
+    indexLoadingConfig = new IndexLoadingConfig();
+    indexLoadingConfig.setFSTIndexColumns(new HashSet<>(Arrays.asList(FST_INDEX_COL_NAME)));
+    indexLoadingConfig.setReadMode(ReadMode.mmap);
+    indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
+    // check that loaded segment version is v1
+    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v1.toString());
+    // no change/conversion should have happened in indexDir
+    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v1).exists());
+    Assert.assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
+    fstIndexFile = SegmentDirectoryPaths.findFSTIndexIndexFile(_indexDir, FST_INDEX_COL_NAME);
+    Assert.assertNotNull(fstIndexFile);
+    Assert.assertFalse(fstIndexFile.isDirectory());
+    Assert.assertEquals(fstIndexFile.getName(), FST_INDEX_COL_NAME + FST_INDEX_FILE_EXTENSION);
+    Assert.assertEquals(fstIndexFile.getParentFile().getName(), new SegmentMetadataImpl(_indexDir).getName());
+    indexSegment.destroy();
+
+    // CASE 2: set the segment version to load in IndexLoadingConfig to V1
+    // there should be no conversion done by ImmutableSegmentLoader since the segmentVersionToLoad
+    // is same as the version of segment on fisk
+    indexLoadingConfig.setSegmentVersion(SegmentVersion.v1);
+    indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
+    // check that loaded segment version is v1
+    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v1.toString());
+    // no change/conversion should have happened in indexDir
+    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v1).exists());
+    Assert.assertFalse(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
+    fstIndexFile = SegmentDirectoryPaths.findFSTIndexIndexFile(_indexDir, FST_INDEX_COL_NAME);
+    Assert.assertNotNull(fstIndexFile);
+    Assert.assertFalse(fstIndexFile.isDirectory());
+    Assert.assertEquals(fstIndexFile.getName(), FST_INDEX_COL_NAME + FST_INDEX_FILE_EXTENSION);
+    Assert.assertEquals(fstIndexFile.getParentFile().getName(), new SegmentMetadataImpl(_indexDir).getName());
+    indexSegment.destroy();
+
+    // CASE 3: set the segment version to load in IndexLoadingConfig to V3
+    // there should be conversion done by ImmutableSegmentLoader since the segmentVersionToLoad
+    // is different than the version of segment on disk
+    indexLoadingConfig.setSegmentVersion(SegmentVersion.v3);
+    indexSegment = ImmutableSegmentLoader.load(_indexDir, indexLoadingConfig);
+    // check that loaded segment version is v3
+    Assert.assertEquals(indexSegment.getSegmentMetadata().getVersion(), SegmentVersion.v3.toString());
+    // the index dir should exist in v3 format due to conversion
+    Assert.assertTrue(SegmentDirectoryPaths.segmentDirectoryFor(_indexDir, SegmentVersion.v3).exists());
+    verifyIndexDirIsV3(_indexDir);
+    fstIndexFile = SegmentDirectoryPaths.findFSTIndexIndexFile(_indexDir, FST_INDEX_COL_NAME);
+    Assert.assertNull(fstIndexFile);
+    segmentDir = SegmentDirectory.createFromLocalFS(_indexDir, ReadMode.heap);
+    reader = segmentDir.createReader();
+    Assert.assertNotNull(reader);
+    Assert.assertTrue(reader.hasIndexFor(FST_INDEX_COL_NAME, ColumnIndexType.FST_INDEX));
+    indexSegment.destroy();
+  }
+
   private void constructSegmentWithTextIndex(SegmentVersion segmentVersion)
       throws Exception {
     FileUtils.deleteQuietly(INDEX_DIR);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessorTest.java
index 2f9c6ac..325a17e 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessorTest.java
@@ -75,10 +75,14 @@ public class SegmentPreProcessorTest {
   private static final String NEWLY_ADDED_STRING_COL_RAW = "newTextColRaw";
   private static final String NEWLY_ADDED_STRING_COL_DICT = "newTextColDict";
 
+  // For create fst index tests
+  private static final String NEWLY_ADDED_FST_COL_DICT = "newFSTColDict";
+
   // For update default value tests.
   private static final String NEW_COLUMNS_SCHEMA1 = "data/newColumnsSchema1.json";
   private static final String NEW_COLUMNS_SCHEMA2 = "data/newColumnsSchema2.json";
   private static final String NEW_COLUMNS_SCHEMA3 = "data/newColumnsSchema3.json";
+  private static final String NEW_COLUMNS_SCHEMA4 = "data/newColumnsSchema4.json";
   private static final String NEW_COLUMNS_SCHEMA_WITH_TEXT = "data/newColumnsWithTextSchema.json";
   private static final String NEW_INT_METRIC_COLUMN_NAME = "newIntMetric";
   private static final String NEW_LONG_METRIC_COLUMN_NAME = "newLongMetric";
@@ -98,6 +102,7 @@ public class SegmentPreProcessorTest {
   private Schema _newColumnsSchema1;
   private Schema _newColumnsSchema2;
   private Schema _newColumnsSchema3;
+  private Schema _newColumnsSchema4;
   private Schema _newColumnsSchemaWithText;
 
   @BeforeClass
@@ -135,6 +140,9 @@ public class SegmentPreProcessorTest {
     resourceUrl = classLoader.getResource(NEW_COLUMNS_SCHEMA_WITH_TEXT);
     Assert.assertNotNull(resourceUrl);
     _newColumnsSchemaWithText = Schema.fromFile(new File(resourceUrl.getFile()));
+    resourceUrl = classLoader.getResource(NEW_COLUMNS_SCHEMA4);
+    Assert.assertNotNull(resourceUrl);
+    _newColumnsSchema4 = Schema.fromFile(new File(resourceUrl.getFile()));
   }
 
   private void constructV1Segment()
@@ -198,6 +206,64 @@ public class SegmentPreProcessorTest {
     checkTextIndexCreation(NEWLY_ADDED_STRING_COL_RAW, 1, 1, _newColumnsSchemaWithText, true, true, true, 4);
   }
 
+  @Test
+  public void testEnableFSTIndexOnExistingColumnRaw()
+      throws Exception {
+    Set<String> fstColumns = new HashSet<>();
+    fstColumns.add(EXISTING_STRING_COL_RAW);
+    _indexLoadingConfig.setFSTIndexColumns(fstColumns);
+    _indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_STRING_COL_RAW);
+    constructV3Segment();
+    SegmentPreProcessor v3Processor = new SegmentPreProcessor(_indexDir, _indexLoadingConfig, _newColumnsSchema4);
+    Assert.expectThrows(UnsupportedOperationException.class, () -> v3Processor.process());
+
+    constructV1Segment();
+    SegmentPreProcessor v1Processor = new SegmentPreProcessor(_indexDir, _indexLoadingConfig, _newColumnsSchema4);
+    Assert.expectThrows(UnsupportedOperationException.class, () -> v1Processor.process());
+  }
+
+  @Test
+  public void testEnableFSTIndexOnNewColumnDictEncoded() throws Exception {
+    Set<String> fstColumns = new HashSet<>();
+    fstColumns.add(NEWLY_ADDED_FST_COL_DICT);
+    _indexLoadingConfig.setFSTIndexColumns(fstColumns);
+
+    constructV3Segment();
+    checkFSTIndexCreation(
+            NEWLY_ADDED_FST_COL_DICT, 1, 1, _newColumnsSchema4,
+            true, true, 4);
+
+    constructV1Segment();
+    checkFSTIndexCreation(
+            NEWLY_ADDED_FST_COL_DICT, 1, 1, _newColumnsSchema4,
+            true, true,4);
+  }
+
+
+  @Test
+  public void testEnableFSTIndexOnExistingColumnDictEncoded() throws Exception {
+    Set<String> fstColumns = new HashSet<>();
+    fstColumns.add(EXISTING_STRING_COL_DICT);
+    _indexLoadingConfig.setFSTIndexColumns(fstColumns);
+
+    constructV3Segment();
+    SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
+    ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(
+            EXISTING_STRING_COL_DICT);
+    Assert.assertNotNull(columnMetadata);
+    checkFSTIndexCreation(
+            EXISTING_STRING_COL_DICT, 9, 4, _newColumnsSchema4,
+            false, false,26);
+
+    constructV1Segment();
+    segmentMetadata = new SegmentMetadataImpl(_indexDir);
+    columnMetadata = segmentMetadata.getColumnMetadataFor(EXISTING_STRING_COL_DICT);
+    Assert.assertNotNull(columnMetadata);
+    checkFSTIndexCreation(
+            EXISTING_STRING_COL_DICT, 9, 4, _newColumnsSchema4,
+            false, false,26);
+  }
+
   /**
    * Test to check for default column handling and text index creation during
    * segment load after a new dictionary encoded column is added to the schema
@@ -297,8 +363,33 @@ public class SegmentPreProcessorTest {
     checkTextIndexCreation(EXISTING_STRING_COL_DICT, 9, 4, _schema, false, true, false, 26);
   }
 
-  private void checkTextIndexCreation(String column, int cardinality, int bits, Schema schema, boolean isAutoGenerated,
-      boolean hasDictionary, boolean isSorted, int dictionaryElementSize)
+  private void checkFSTIndexCreation(
+          String column, int cardinality, int bits,
+          Schema schema, boolean isAutoGenerated,
+          boolean isSorted, int dictionaryElementSize)
+          throws Exception {
+    checkIndexCreation(
+            ColumnIndexType.FST_INDEX, column, cardinality, bits, schema,
+            isAutoGenerated, true, isSorted,
+            dictionaryElementSize);
+  }
+
+  private void checkTextIndexCreation(
+          String column, int cardinality, int bits,
+          Schema schema, boolean isAutoGenerated,
+          boolean hasDictionary, boolean isSorted,
+          int dictionaryElementSize) throws Exception {
+    ColumnMetadata columnMetadata = checkIndexCreation(
+            ColumnIndexType.TEXT_INDEX, column, cardinality, bits, schema,
+            isAutoGenerated, hasDictionary, isSorted, dictionaryElementSize);
+    Assert.assertEquals(columnMetadata.getTextIndexType(), TextIndexType.LUCENE);
+  }
+
+  private ColumnMetadata checkIndexCreation(
+          ColumnIndexType indexType,
+          String column, int cardinality,
+          int bits, Schema schema, boolean isAutoGenerated,
+          boolean hasDictionary, boolean isSorted, int dictionaryElementSize)
       throws Exception {
     try (SegmentPreProcessor processor = new SegmentPreProcessor(_indexDir, _indexLoadingConfig, schema)) {
       processor.process();
@@ -314,17 +405,18 @@ public class SegmentPreProcessorTest {
       Assert.assertEquals(columnMetadata.isSorted(), isSorted);
       Assert.assertFalse(columnMetadata.hasNulls());
       Assert.assertEquals(columnMetadata.hasDictionary(), hasDictionary);
-      Assert.assertEquals(columnMetadata.getTextIndexType(), TextIndexType.LUCENE);
       Assert.assertTrue(columnMetadata.isSingleValue());
       Assert.assertEquals(columnMetadata.getMaxNumberOfMultiValues(), 0);
       Assert.assertEquals(columnMetadata.getTotalNumberOfEntries(), 100000);
       Assert.assertEquals(columnMetadata.isAutoGenerated(), isAutoGenerated);
       Assert.assertEquals(columnMetadata.getDefaultNullValueString(), "null");
 
-      try (SegmentDirectory segmentDirectory = SegmentDirectory.createFromLocalFS(_indexDir, ReadMode.mmap);
+      try (SegmentDirectory segmentDirectory =
+                   SegmentDirectory.createFromLocalFS(_indexDir, ReadMode.mmap);
           SegmentDirectory.Reader reader = segmentDirectory.createReader()) {
-        Assert.assertTrue(reader.hasIndexFor(column, ColumnIndexType.TEXT_INDEX));
-        Assert.assertTrue(reader.hasIndexFor(column, ColumnIndexType.FORWARD_INDEX));
+        Assert.assertTrue(reader.hasIndexFor(column, indexType));
+        Assert.assertTrue(reader.hasIndexFor(
+                column, ColumnIndexType.FORWARD_INDEX));
         // if the text index is enabled on a new column with dictionary,
         // then dictionary should be created by the default column handler
         if (hasDictionary) {
@@ -333,6 +425,7 @@ public class SegmentPreProcessorTest {
           Assert.assertFalse(reader.hasIndexFor(column, ColumnIndexType.DICTIONARY));
         }
       }
+      return columnMetadata;
     }
   }
 
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
index 9a7a880..227166b 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
@@ -646,6 +646,50 @@ public class TableConfigUtilsTest {
   }
 
   @Test
+  public void testValidateFieldConfig() {
+    Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addSingleValueDimension("myCol1", FieldSpec.DataType.STRING).build();
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        .setNoDictionaryColumns(Arrays.asList("myCol1")).build();
+
+    try {
+      FieldConfig fieldConfig =
+          new FieldConfig("myCol1", FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.FST, null);
+      tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
+      TableConfigUtils.validate(tableConfig, schema);
+      Assert.fail("Should fail for with conflicting encoding type of myCol1");
+    } catch (Exception e) {
+      Assert.assertEquals(e.getMessage(),
+          "FieldConfig encoding type is different from indexingConfig for column: myCol1");
+    }
+
+    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        .setNoDictionaryColumns(Arrays.asList("myCol1")).build();
+    try {
+      FieldConfig fieldConfig =
+          new FieldConfig("myCol1", FieldConfig.EncodingType.RAW, FieldConfig.IndexType.FST, null);
+      tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
+      TableConfigUtils.validate(tableConfig, schema);
+      Assert.fail("Should fail since FST index is enabled on RAW encoding type");
+    } catch (Exception e) {
+      Assert.assertEquals(e.getMessage(), "FST Index is only enabled on dictionary encoded columns");
+    }
+
+    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        .setNoDictionaryColumns(Arrays.asList("myCol1")).build();
+    try {
+      FieldConfig fieldConfig =
+          new FieldConfig("myCol21", FieldConfig.EncodingType.RAW, FieldConfig.IndexType.FST, null);
+      tableConfig.setFieldConfigList(Arrays.asList(fieldConfig));
+      TableConfigUtils.validate(tableConfig, schema);
+      Assert.fail("Should fail since field name is not persent in schema");
+    } catch (Exception e) {
+      Assert.assertEquals(e.getMessage(),
+          "Column Name myCol21 defined in field config list must be a valid column defined in the schema");
+    }
+  }
+
+  @Test
   public void testValidateIndexingConfig() {
     Schema schema =
         new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/FSTBasedRegexpLikeQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/FSTBasedRegexpLikeQueriesTest.java
new file mode 100644
index 0000000..8516dd8
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/FSTBasedRegexpLikeQueriesTest.java
@@ -0,0 +1,477 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.Serializable;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.AggregationResult;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.response.broker.SelectionResults;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.data.readers.GenericRowRecordReader;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.query.AggregationGroupByOperator;
+import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
+import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
+import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+
+public class FSTBasedRegexpLikeQueriesTest extends BaseQueriesTest {
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "TextSearchQueriesTest");
+  private static final String TABLE_NAME = "MyTable";
+  private static final String SEGMENT_NAME = "testSegment";
+  private static final String DOMAIN_NAMES_COL = "DOMAIN_NAMES";
+  private static final String URL_COL = "URL_COL";
+  private static final String INT_COL_NAME = "INT_COL";
+  private static final String NO_INDEX_STRING_COL_NAME = "NO_INDEX_COL";
+  private static final Integer INT_BASE_VALUE = 1000;
+  private static final Integer NUM_ROWS = 1024;
+
+  private final List<GenericRow> _rows = new ArrayList<>();
+
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
+
+  @Override
+  protected String getFilter() {
+    return "";
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteQuietly(INDEX_DIR);
+
+    buildSegment();
+    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
+    Set<String> fstIndexCols = new HashSet<>();
+    fstIndexCols.add(DOMAIN_NAMES_COL);
+    indexLoadingConfig.setFSTIndexColumns(fstIndexCols);
+
+    Set<String> invertedIndexCols = new HashSet<>();
+    invertedIndexCols.add(DOMAIN_NAMES_COL);
+    indexLoadingConfig.setInvertedIndexColumns(invertedIndexCols);
+    ImmutableSegment immutableSegment =
+        ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), indexLoadingConfig);
+    _indexSegment = immutableSegment;
+    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+  }
+
+  @AfterClass
+  public void tearDown() {
+    _indexSegment.destroy();
+    FileUtils.deleteQuietly(INDEX_DIR);
+  }
+
+  private List<String> getURLSufficies() {
+    return Arrays.asList("/a", "/b", "/c", "/d");
+  }
+
+  private List<String> getNoIndexData() {
+    return Arrays.asList("test1", "test2", "test3", "test4", "test5");
+  }
+
+  private List<String> getDomainNames() {
+    return Arrays
+        .asList("www.domain1.com", "www.domain1.co.ab", "www.domain1.co.bc", "www.domain1.co.cd", "www.sd.domain1.com",
+            "www.sd.domain1.co.ab", "www.sd.domain1.co.bc", "www.sd.domain1.co.cd", "www.domain2.com",
+            "www.domain2.co.ab", "www.domain2.co.bc", "www.domain2.co.cd", "www.sd.domain2.com", "www.sd.domain2.co.ab",
+            "www.sd.domain2.co.bc", "www.sd.domain2.co.cd");
+  }
+
+  private List<GenericRow> createTestData(int numRows)
+      throws Exception {
+    List<GenericRow> rows = new ArrayList<>();
+    List<String> domainNames = getDomainNames();
+    List<String> urlSufficies = getURLSufficies();
+    List<String> noIndexData = getNoIndexData();
+    for (int i = 0; i < numRows; i++) {
+      String domain = domainNames.get(i % domainNames.size());
+      String url = domain + urlSufficies.get(i % urlSufficies.size());
+
+      GenericRow row = new GenericRow();
+      row.putField(INT_COL_NAME, INT_BASE_VALUE + i);
+      row.putField(NO_INDEX_STRING_COL_NAME, noIndexData.get(i % noIndexData.size()));
+      row.putField(DOMAIN_NAMES_COL, domain);
+      row.putField(URL_COL, url);
+      rows.add(row);
+    }
+    return rows;
+  }
+
+  private void buildSegment()
+      throws Exception {
+    List<GenericRow> rows = createTestData(NUM_ROWS);
+    List<FieldConfig> fieldConfigs = new ArrayList<>();
+    fieldConfigs
+        .add(new FieldConfig(DOMAIN_NAMES_COL, FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.FST, null));
+    fieldConfigs.add(new FieldConfig(URL_COL, FieldConfig.EncodingType.DICTIONARY, FieldConfig.IndexType.FST, null));
+
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        .setInvertedIndexColumns(Arrays.asList(DOMAIN_NAMES_COL)).setFieldConfigList(fieldConfigs).build();
+    Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addSingleValueDimension(DOMAIN_NAMES_COL, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(URL_COL, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(NO_INDEX_STRING_COL_NAME, FieldSpec.DataType.STRING)
+        .addMetric(INT_COL_NAME, FieldSpec.DataType.INT).build();
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
+    config.setOutDir(INDEX_DIR.getPath());
+    config.setTableName(TABLE_NAME);
+    config.setSegmentName(SEGMENT_NAME);
+
+    SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+    try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
+      driver.init(config, recordReader);
+      driver.build();
+    }
+  }
+
+  private void testInterSegmentSelectionQueryHelper(String query, int expectedResultSize,
+      List<Serializable[]> expectedResults) {
+    // PQL
+    BrokerResponseNative brokerResponseNative = getBrokerResponseForPqlQuery(query);
+    SelectionResults selectionResults = brokerResponseNative.getSelectionResults();
+    List<String> columns = selectionResults.getColumns();
+    Assert.assertEquals(columns.size(), 2);
+    List<Serializable[]> rows = selectionResults.getRows();
+    Assert.assertEquals(rows.size(), expectedResultSize);
+
+    if (expectedResults != null) {
+      for (int i = 0; i < expectedResults.size(); i++) {
+        Serializable[] actualRow = rows.get(i);
+        Serializable[] expectedRow = expectedResults.get(i);
+        Assert.assertEquals(actualRow[0], String.valueOf(expectedRow[0]));
+        Assert.assertEquals(actualRow[1], expectedRow[1]);
+      }
+    }
+
+    // SQL
+    brokerResponseNative = getBrokerResponseForSqlQuery(query);
+    ResultTable resultTable = brokerResponseNative.getResultTable();
+    DataSchema dataSchema = resultTable.getDataSchema();
+    Assert.assertEquals(dataSchema.size(), 2);
+    Assert.assertEquals(dataSchema.getColumnName(0), "INT_COL");
+    Assert.assertEquals(dataSchema.getColumnName(1), "URL_COL");
+    Assert.assertEquals(dataSchema.getColumnDataType(0), DataSchema.ColumnDataType.INT);
+    Assert.assertEquals(dataSchema.getColumnDataType(1), DataSchema.ColumnDataType.STRING);
+    List<Object[]> results = resultTable.getRows();
+    Assert.assertEquals(results.size(), expectedResultSize);
+
+    if (expectedResults != null) {
+      for (int i = 0; i < expectedResults.size(); i++) {
+        Object[] actualRow = results.get(i);
+        Serializable[] expectedRow = expectedResults.get(i);
+        Assert.assertEquals(actualRow, expectedRow);
+      }
+    }
+  }
+
+  private void testSelectionResults(String query, int expectedResultSize, List<Serializable[]> expectedResults)
+      throws Exception {
+    Operator<IntermediateResultsBlock> operator = getOperatorForPqlQuery(query);
+    IntermediateResultsBlock operatorResult = operator.nextBlock();
+    List<Object[]> resultset = (List<Object[]>) operatorResult.getSelectionResult();
+    Assert.assertNotNull(resultset);
+    Assert.assertEquals(resultset.size(), expectedResultSize);
+    if (expectedResults != null) {
+      for (int i = 0; i < expectedResultSize; i++) {
+        Object[] actualRow = resultset.get(i);
+        Object[] expectedRow = expectedResults.get(i);
+        Assert.assertEquals(actualRow.length, expectedRow.length);
+        for (int j = 0; j < actualRow.length; j++) {
+          Object actualColValue = actualRow[j];
+          Object expectedColValue = expectedRow[j];
+          Assert.assertEquals(actualColValue, expectedColValue);
+        }
+      }
+    }
+  }
+
+  private AggregationGroupByResult getGroupByResults(String query)
+      throws Exception {
+    AggregationGroupByOperator operator = getOperatorForPqlQuery(query);
+    IntermediateResultsBlock resultsBlock = operator.nextBlock();
+    return resultsBlock.getAggregationGroupByResult();
+  }
+
+  private void matchGroupResult(AggregationGroupByResult result, String key, long count) {
+    Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator = result.getGroupKeyIterator();
+    Boolean found = false;
+    while (groupKeyIterator.hasNext()) {
+      GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
+      if (groupKey._stringKey.equals(key)) {
+        Assert.assertEquals(((Number) result.getResultForKey(groupKey, 0)).longValue(), count);
+        found = true;
+      }
+    }
+    Assert.assertTrue(found);
+  }
+
+  private void testInterSegmentAggregationQueryHelper(String query, long expectedCount) {
+    // PQL
+    BrokerResponseNative brokerResponseNative = getBrokerResponseForPqlQuery(query);
+    List<AggregationResult> aggregationResults = brokerResponseNative.getAggregationResults();
+    Assert.assertEquals(aggregationResults.size(), 1);
+    Assert.assertEquals(aggregationResults.get(0).getValue().toString(), String.valueOf(expectedCount));
+
+    // SQL
+    brokerResponseNative = getBrokerResponseForSqlQuery(query);
+    ResultTable resultTable = brokerResponseNative.getResultTable();
+    DataSchema dataSchema = resultTable.getDataSchema();
+    Assert.assertEquals(dataSchema.size(), 1);
+    Assert.assertEquals(dataSchema.getColumnName(0), "count(*)");
+    Assert.assertEquals(dataSchema.getColumnDataType(0), DataSchema.ColumnDataType.LONG);
+    List<Object[]> rows = resultTable.getRows();
+    Assert.assertEquals(rows.size(), 1);
+    Object[] row = rows.get(0);
+    Assert.assertEquals(row.length, 1);
+    Assert.assertEquals(row[0], expectedCount);
+  }
+
+  @Test
+  public void testFSTBasedRegexLike()
+      throws Exception {
+    // Select queries on col with FST + inverted index.
+    String query = "SELECT INT_COL, URL_COL FROM MyTable WHERE REGEXP_LIKE(DOMAIN_NAMES, 'www.domain1.*') LIMIT 50000";
+    testSelectionResults(query, 256, null);
+
+    query = "SELECT INT_COL, URL_COL FROM MyTable WHERE REGEXP_LIKE(DOMAIN_NAMES, 'www.sd.domain1.*') LIMIT 50000";
+    testSelectionResults(query, 256, null);
+
+    query = "SELECT INT_COL, URL_COL FROM MyTable WHERE REGEXP_LIKE(DOMAIN_NAMES, '.*domain1.*') LIMIT 50000";
+    testSelectionResults(query, 512, null);
+
+    query = "SELECT INT_COL, URL_COL FROM MyTable WHERE REGEXP_LIKE(DOMAIN_NAMES, '.*domain.*') LIMIT 50000";
+    testSelectionResults(query, 1024, null);
+
+    query = "SELECT INT_COL, URL_COL FROM MyTable WHERE REGEXP_LIKE(DOMAIN_NAMES, '.*com') LIMIT 50000";
+    testSelectionResults(query, 256, null);
+
+    // Select queries on col with just FST index.
+    query = "SELECT INT_COL, URL_COL FROM MyTable WHERE REGEXP_LIKE(URL_COL, 'www.domain1.*') LIMIT 50000";
+    testSelectionResults(query, 256, null);
+
+    query = "SELECT INT_COL, URL_COL FROM MyTable WHERE REGEXP_LIKE(URL_COL, 'www.domain1.*') LIMIT 5";
+    List<Serializable[]> expected = new ArrayList<>();
+    expected.add(new Serializable[]{1000, "www.domain1.com/a"});
+    expected.add(new Serializable[]{1001, "www.domain1.co.ab/b"});
+    expected.add(new Serializable[]{1002, "www.domain1.co.bc/c"});
+    expected.add(new Serializable[]{1003, "www.domain1.co.cd/d"});
+    expected.add(new Serializable[]{1016, "www.domain1.com/a"});
+    testSelectionResults(query, 5, expected);
+
+    query = "SELECT INT_COL, URL_COL FROM MyTable WHERE REGEXP_LIKE(URL_COL, 'www.sd.domain1.*') LIMIT 50000";
+    testSelectionResults(query, 256, null);
+
+    query = "SELECT INT_COL, URL_COL FROM MyTable WHERE REGEXP_LIKE(URL_COL, '.*domain1.*') LIMIT 50000";
+    testSelectionResults(query, 512, null);
+
+    query = "SELECT INT_COL, URL_COL FROM MyTable WHERE REGEXP_LIKE(URL_COL, '.*domain.*') LIMIT 50000";
+    testSelectionResults(query, 1024, null);
+
+    query = "SELECT INT_COL, URL_COL FROM MyTable WHERE REGEXP_LIKE(URL_COL, '.*domain.*') LIMIT 5";
+    expected = new ArrayList<>();
+    expected.add(new Serializable[]{1000, "www.domain1.com/a"});
+    expected.add(new Serializable[]{1001, "www.domain1.co.ab/b"});
+    expected.add(new Serializable[]{1002, "www.domain1.co.bc/c"});
+    expected.add(new Serializable[]{1003, "www.domain1.co.cd/d"});
+    expected.add(new Serializable[]{1004, "www.sd.domain1.com/a"});
+    testSelectionResults(query, 5, expected);
+    testSelectionResults(query, 5, null);
+
+    query = "SELECT INT_COL, URL_COL FROM MyTable WHERE REGEXP_LIKE(URL_COL, '.*/a') LIMIT 50000";
+    testSelectionResults(query, 256, null);
+
+    query = "SELECT INT_COL, URL_COL FROM MyTable WHERE REGEXP_LIKE(URL_COL, '.*/a') LIMIT 5";
+    expected = new ArrayList<>();
+    expected.add(new Serializable[]{1000, "www.domain1.com/a"});
+    expected.add(new Serializable[]{1004, "www.sd.domain1.com/b"});
+    expected.add(new Serializable[]{1008, "www.domain2.com/c"});
+    expected.add(new Serializable[]{1012, "www.sd.domain2.co.cd/d"});
+    expected.add(new Serializable[]{1016, "www.domain1.com/a"});
+    testSelectionResults(query, 5, null);
+  }
+
+  @Test
+  public void testFSTBasedRegexpLikeWithOtherFilters()
+      throws Exception {
+    String query;
+
+    // Select queries on columns with combination of FST Index , (FST + Inverted Index), No index and other constraints.
+    query =
+        "SELECT INT_COL, URL_COL FROM MyTable WHERE REGEXP_LIKE(URL_COL, '.*/a') and REGEXP_LIKE(NO_INDEX_COL, 'test1') LIMIT 50000";
+    testSelectionResults(query, 52, null);
+
+    query =
+        "SELECT INT_COL, URL_COL FROM MyTable WHERE REGEXP_LIKE(URL_COL, '.*/b') and REGEXP_LIKE(NO_INDEX_COL, 'test1') LIMIT 50000";
+    testSelectionResults(query, 51, null);
+
+    query =
+        "SELECT INT_COL, URL_COL FROM MyTable WHERE REGEXP_LIKE(DOMAIN_NAMES, 'www.domain1.*') AND REGEXP_LIKE(URL_COL, '.*/a') and REGEXP_LIKE(NO_INDEX_COL, 'test1') LIMIT 50000";
+    testSelectionResults(query, 13, null);
+
+    query =
+        "SELECT INT_COL, URL_COL FROM MyTable WHERE REGEXP_LIKE(DOMAIN_NAMES, 'www.domain1.co\\..*') AND REGEXP_LIKE(URL_COL, '.*/a') and REGEXP_LIKE(NO_INDEX_COL, 'test1') LIMIT 50000";
+    testSelectionResults(query, 0, null);
+
+    query =
+        "SELECT INT_COL, URL_COL FROM MyTable WHERE REGEXP_LIKE(DOMAIN_NAMES, 'www.domain1.co\\..*') AND REGEXP_LIKE(URL_COL, '.*/b') and REGEXP_LIKE(NO_INDEX_COL, 'test1') LIMIT 50000";
+    testSelectionResults(query, 12, null);
+
+    query =
+        "SELECT INT_COL, URL_COL FROM MyTable WHERE REGEXP_LIKE(URL_COL, '.*/a') and REGEXP_LIKE(NO_INDEX_COL, 'test1') and INT_COL=1000 LIMIT 50000";
+    List<Serializable[]> expected = new ArrayList<>();
+    expected.add(new Serializable[]{1000, "www.domain1.com/a"});
+    testSelectionResults(query, 1, expected);
+
+    query =
+        "SELECT INT_COL, URL_COL FROM MyTable WHERE REGEXP_LIKE(DOMAIN_NAMES, 'www.domain1.*') AND REGEXP_LIKE(URL_COL, '.*/b') and REGEXP_LIKE(NO_INDEX_COL, 'test2') and INT_COL=1001 LIMIT 50000";
+    expected = new ArrayList<>();
+    expected.add(new Serializable[]{1001, "www.domain1.co.ab/b"});
+    testSelectionResults(query, 1, expected);
+  }
+
+  @Test
+  public void testGroupByOnFSTBasedRegexpLike()
+      throws Exception {
+    String query;
+    query =
+        "SELECT DOMAIN_NAMES, count(*) FROM MyTable WHERE REGEXP_LIKE(DOMAIN_NAMES, 'www.domain1.*') group by DOMAIN_NAMES LIMIT 50000";
+    AggregationGroupByResult result = getGroupByResults(query);
+    matchGroupResult(result, "www.domain1.com", 64);
+    matchGroupResult(result, "www.domain1.co.ab", 64);
+    matchGroupResult(result, "www.domain1.co.bc", 64);
+    matchGroupResult(result, "www.domain1.co.cd", 64);
+
+    query =
+        "SELECT URL_COL, count(*) FROM MyTable WHERE REGEXP_LIKE(URL_COL, '.*/a') and REGEXP_LIKE(NO_INDEX_COL, 'test1') group by URL_COL LIMIT 5000";
+    result = getGroupByResults(query);
+    matchGroupResult(result, "www.domain1.com/a", 13);
+    matchGroupResult(result, "www.sd.domain1.com/a", 13);
+    matchGroupResult(result, "www.domain2.com/a", 13);
+    matchGroupResult(result, "www.sd.domain2.com/a", 13);
+
+    query =
+        "SELECT URL_COL, count(*) FROM MyTable WHERE REGEXP_LIKE(URL_COL, '.*/b') and REGEXP_LIKE(NO_INDEX_COL, 'test1') group by URL_COL LIMIT 5000";
+    result = getGroupByResults(query);
+    matchGroupResult(result, "www.domain1.co.ab/b", 12);
+    matchGroupResult(result, "www.sd.domain1.co.ab/b", 13);
+    matchGroupResult(result, "www.domain2.co.ab/b", 13);
+    matchGroupResult(result, "www.sd.domain2.co.ab/b", 13);
+
+    query =
+        "SELECT URL_COL, count(*) FROM MyTable WHERE REGEXP_LIKE(URL_COL, '.*/b') and REGEXP_LIKE(NO_INDEX_COL, 'test1') AND INT_COL > 1005 group by URL_COL LIMIT 5000";
+    result = getGroupByResults(query);
+    matchGroupResult(result, "www.domain1.co.ab/b", 12);
+    matchGroupResult(result, "www.sd.domain1.co.ab/b", 12);
+    matchGroupResult(result, "www.domain2.co.ab/b", 13);
+    matchGroupResult(result, "www.sd.domain2.co.ab/b", 13);
+
+    query =
+        "SELECT URL_COL, count(*) FROM MyTable WHERE REGEXP_LIKE(URL_COL, 'www.domain1.*/a') group by URL_COL LIMIT 50000";
+    result = getGroupByResults(query);
+    matchGroupResult(result, "www.domain1.com/a", 64);
+  }
+
+  @Test
+  public void testInterSegment() {
+    String query =
+        "SELECT INT_COL, URL_COL FROM MyTable WHERE REGEXP_LIKE(DOMAIN_NAMES, 'www.domain1.*') AND REGEXP_LIKE(URL_COL, '.*/b') and REGEXP_LIKE(NO_INDEX_COL, 'test2') and INT_COL=1001 LIMIT 50000";
+    List<Serializable[]> expected = new ArrayList<>();
+    expected.add(new Serializable[]{1001, "www.domain1.co.ab/b"});
+    expected.add(new Serializable[]{1001, "www.domain1.co.ab/b"});
+    expected.add(new Serializable[]{1001, "www.domain1.co.ab/b"});
+    expected.add(new Serializable[]{1001, "www.domain1.co.ab/b"});
+    testInterSegmentSelectionQueryHelper(query, 4, expected);
+
+    query =
+        "SELECT INT_COL, URL_COL FROM MyTable WHERE REGEXP_LIKE(URL_COL, '.*/a') and REGEXP_LIKE(NO_INDEX_COL, 'test1') and INT_COL=1000 LIMIT 50000";
+    expected = new ArrayList<>();
+    expected.add(new Serializable[]{1000, "www.domain1.com/a"});
+    expected.add(new Serializable[]{1000, "www.domain1.com/a"});
+    expected.add(new Serializable[]{1000, "www.domain1.com/a"});
+    expected.add(new Serializable[]{1000, "www.domain1.com/a"});
+    testInterSegmentSelectionQueryHelper(query, 4, expected);
+
+    query =
+        "SELECT INT_COL, URL_COL FROM MyTable WHERE REGEXP_LIKE(DOMAIN_NAMES, 'www.domain1.co\\..*') AND REGEXP_LIKE(URL_COL, '.*/b') and REGEXP_LIKE(NO_INDEX_COL, 'test1') ORDER  BY INT_COL LIMIT 5000";
+    testInterSegmentSelectionQueryHelper(query, 48, null);
+
+    query =
+        "SELECT INT_COL, URL_COL FROM MyTable WHERE REGEXP_LIKE(DOMAIN_NAMES, 'www.domain1.co\\..*') AND REGEXP_LIKE(URL_COL, '.*/a') and REGEXP_LIKE(NO_INDEX_COL, 'test1') LIMIT 50000";
+    testInterSegmentSelectionQueryHelper(query, 0, null);
+
+    query =
+        "SELECT INT_COL, URL_COL FROM MyTable WHERE REGEXP_LIKE(DOMAIN_NAMES, 'www.domain1.*') AND REGEXP_LIKE(URL_COL, '.*/a') and REGEXP_LIKE(NO_INDEX_COL, 'test1') LIMIT 50000";
+    testInterSegmentSelectionQueryHelper(query, 52, null);
+
+    query = "SELECT count(*) FROM MyTable WHERE REGEXP_LIKE(URL_COL, 'www.domain1.*/a')";
+    testInterSegmentAggregationQueryHelper(query, 256);
+
+    query =
+        "SELECT count(*) FROM MyTable WHERE REGEXP_LIKE(URL_COL, '.*/b') and REGEXP_LIKE(NO_INDEX_COL, 'test1') AND INT_COL > 1005 ";
+    testInterSegmentAggregationQueryHelper(query, 200);
+
+    query = "SELECT count(*) FROM MyTable WHERE REGEXP_LIKE(URL_COL, '.*/b') and REGEXP_LIKE(NO_INDEX_COL, 'test1')";
+    testInterSegmentAggregationQueryHelper(query, 204);
+
+    query = "SELECT count(*) FROM MyTable WHERE REGEXP_LIKE(URL_COL, '.*/a') and REGEXP_LIKE(NO_INDEX_COL, 'test1')";
+    testInterSegmentAggregationQueryHelper(query, 208);
+
+    query = "SELECT count(*) FROM MyTable WHERE REGEXP_LIKE(DOMAIN_NAMES, 'www.domain1.*')";
+    testInterSegmentAggregationQueryHelper(query, 1024);
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java
index 4caa8e0..dd71934 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java
@@ -86,7 +86,8 @@ import org.testng.annotations.Test;
  * The test table has a SKILLS column and QUERY_LOG column. Text index is created
  * on each of these columns.
  */
-public class TextSearchQueriesTest extends BaseQueriesTest {  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "TextSearchQueriesTest");
+public class TextSearchQueriesTest extends BaseQueriesTest {
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "TextSearchQueriesTest");
   private static final String TABLE_NAME = "MyTable";
   private static final String SEGMENT_NAME = "testSegment";
 
diff --git a/pinot-core/src/test/java/org/apache/pinot/util/FSTBuilderTest.java b/pinot-core/src/test/java/org/apache/pinot/util/FSTBuilderTest.java
new file mode 100644
index 0000000..1d3fe91
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/util/FSTBuilderTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.util;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.lucene.store.OutputStreamDataOutput;
+import org.apache.lucene.util.automaton.Automaton;
+import org.apache.lucene.util.automaton.RegExp;
+import org.apache.lucene.util.fst.FST;
+import org.apache.lucene.util.fst.OffHeapFSTStore;
+import org.apache.lucene.util.fst.Outputs;
+import org.apache.lucene.util.fst.PositiveIntOutputs;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.core.util.fst.FSTBuilder;
+import org.apache.pinot.core.util.fst.RegexpMatcher;
+import org.apache.pinot.core.util.fst.PinotBufferIndexInput;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteOrder;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class FSTBuilderTest {
+    private static final File TEMP_DIR =
+            new File(FileUtils.getTempDirectory(), "FST");
+
+    @BeforeClass
+    public void setUp()
+            throws Exception {
+        FileUtils.deleteDirectory(TEMP_DIR);
+        TEMP_DIR.mkdirs();
+    }
+
+    @Test
+    public void testRegexMatch() {
+        RegexpMatcher regexpMatcher = new RegexpMatcher("hello.*ld", null);
+        Assert.assertTrue(regexpMatcher.match("helloworld"));
+        Assert.assertTrue(regexpMatcher.match("helloworld"));
+        Assert.assertTrue(regexpMatcher.match("helloasdfworld"));
+        Assert.assertFalse(regexpMatcher.match("ahelloasdfworld"));
+    }
+
+    @Test
+    public void testFSTBuilder() throws IOException {
+        SortedMap<String, Integer> x = new TreeMap<>();
+        x.put("hello-world", 12);
+        x.put("hello-world123", 21);
+        x.put("still", 123);
+
+        FST<Long> fst = FSTBuilder.buildFST(x);
+        File outputFile = new File(TEMP_DIR, "test.lucene");
+        FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
+        OutputStreamDataOutput d = new OutputStreamDataOutput(fileOutputStream);
+        fst.save(d);
+        fileOutputStream.close();
+
+
+        Outputs<Long> outputs = PositiveIntOutputs.getSingleton();
+        File fstFile = new File(outputFile.getAbsolutePath());
+
+        PinotDataBuffer pinotDataBuffer =
+                PinotDataBuffer.mapFile(
+                        fstFile, true,0,
+                        fstFile.length(),
+                        ByteOrder.BIG_ENDIAN, "");
+        PinotBufferIndexInput indexInput = new PinotBufferIndexInput(
+                pinotDataBuffer, 0L, fstFile.length());
+        FST<Long> readFST = new FST(indexInput, outputs, new OffHeapFSTStore());
+
+        List<Long> results = RegexpMatcher.regexMatch("hello.*123", fst);
+        Assert.assertEquals(results.size(), 1);
+        Assert.assertEquals(results.get(0).longValue(), 21L);
+
+        results = RegexpMatcher.regexMatch(".*world", fst);
+        Assert.assertEquals(results.size(), 1);
+        Assert.assertEquals(results.get(0).longValue(), 12L);
+    }
+
+    @AfterClass
+    public void tearDown()
+            throws IOException {
+        FileUtils.deleteDirectory(TEMP_DIR);
+    }
+}
diff --git a/pinot-core/src/test/resources/data/newColumnsSchema4.json b/pinot-core/src/test/resources/data/newColumnsSchema4.json
new file mode 100644
index 0000000..2141b0b
--- /dev/null
+++ b/pinot-core/src/test/resources/data/newColumnsSchema4.json
@@ -0,0 +1,84 @@
+{
+  "schemaName": "testDataSchema4",
+  "dimensionFieldSpecs": [
+    {
+      "name": "column1",
+      "dataType": "INT"
+    },
+    {
+      "name": "column2",
+      "dataType": "INT"
+    },
+    {
+      "name": "column3",
+      "dataType": "STRING"
+    },
+    {
+      "name": "column4",
+      "dataType": "STRING"
+    },
+    {
+      "name": "column5",
+      "dataType": "STRING"
+    },
+    {
+      "name": "newFSTColDict",
+      "dataType": "STRING"
+    },
+    {
+      "name": "newFSTColRaw",
+      "dataType": "STRING"
+    },
+    {
+      "name": "column6",
+      "dataType": "INT",
+      "singleValueField": false
+    },
+    {
+      "name": "column7",
+      "dataType": "INT",
+      "singleValueField": false
+    },
+    {
+      "name": "column8",
+      "dataType": "INT"
+    },
+    {
+      "name": "column9",
+      "dataType": "INT"
+    },
+    {
+      "name": "column10",
+      "dataType": "INT"
+    },
+    {
+      "name": "column13",
+      "dataType": "INT"
+    },
+    {
+      "name": "column14",
+      "dataType": "STRING"
+    },
+    {
+      "name": "column15",
+      "dataType": "STRING"
+    },
+    {
+      "name": "weeksSinceEpochSunday",
+      "dataType": "INT"
+    }
+  ],
+  "metricFieldSpecs": [
+    {
+      "name": "count",
+      "dataType": "INT"
+    }
+  ],
+  "timeFieldSpec": {
+    "incomingGranularitySpec": {
+      "timeType": "DAYS",
+      "dataType": "INT",
+      "name": "daysSinceEpoch"
+    }
+  }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
index c9e1eb6..8e9d32b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/FieldConfig.java
@@ -67,7 +67,7 @@ public class FieldConfig extends BaseJsonConfig {
 
   // If null, there won't be any index
   public enum IndexType {
-    INVERTED, SORTED, TEXT
+    INVERTED, SORTED, TEXT, FST
   }
 
   public String getName() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org