You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ki...@apache.org on 2020/05/14 15:21:40 UTC

[incubator-pinot] branch master updated: Supporting range queries using indexes (#5240)

This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 602f28a  Supporting range queries using indexes (#5240)
602f28a is described below

commit 602f28acae56dd598197a59ac2b425ea40149f08
Author: Kishore Gopalakrishna <g....@gmail.com>
AuthorDate: Thu May 14 08:21:28 2020 -0700

    Supporting range queries using indexes (#5240)
    
    * Adding range index support
    
    * Adding comments
    
    * Make range index to store start of all ranges and end of last range
    
    * Adding comments and fixing broken tests
    
    * More comments
    
    Co-authored-by: Xiang Fu <fx...@gmail.com>
---
 .../pinot/core/common/BlockMultiValIterator.java   |   4 +-
 .../org/apache/pinot/core/common/DataSource.java   |   6 +
 .../indexsegment/mutable/MutableSegmentImpl.java   |   4 +-
 .../DictionaryBasedMultiValueIterator.java         | 108 ++++
 ...ava => DictionaryBasedSingleValueIterator.java} |  43 +-
 .../docvaliterators/MultiValueIterator.java        |  25 +
 .../core/operator/filter/FilterOperatorUtils.java  |   6 +
 .../core/operator/filter/RangeFilterOperator.java  | 114 +++++
 .../filter/predicate/PredicateEvaluator.java       |   3 +
 .../org/apache/pinot/core/plan/FilterPlanNode.java |   1 -
 .../core/segment/creator/impl/V1Constants.java     |   1 +
 .../creator/impl/inv/RangeIndexCreator.java        | 542 +++++++++++++++++++++
 .../segment/index/column/ColumnIndexContainer.java |   5 +
 .../index/column/PhysicalColumnIndexContainer.java |  20 +-
 .../segment/index/datasource/BaseDataSource.java   |  11 +-
 .../index/datasource/ImmutableDataSource.java      |   2 +-
 .../index/datasource/MutableDataSource.java        |   4 +-
 .../segment/index/loader/IndexLoadingConfig.java   |  18 +
 .../segment/index/loader/SegmentPreProcessor.java  |   6 +
 .../loader/invertedindex/RangeIndexHandler.java    | 169 +++++++
 .../segment/index/metadata/SegmentMetadata.java    |   2 +
 .../index/metadata/SegmentMetadataImpl.java        |   5 +
 .../segment/index/readers/RangeIndexReader.java    | 230 +++++++++
 .../pinot/core/segment/store/ColumnIndexType.java  |   3 +-
 .../core/segment/store/FilePerIndexDirectory.java  |   3 +
 .../virtualcolumn/VirtualColumnIndexContainer.java |   5 +
 .../core/startree/v2/store/StarTreeDataSource.java |   3 +-
 .../core/common/RealtimeNoDictionaryTest.java      |  12 +-
 .../index/creator/RangeIndexCreatorTest.java       | 161 ++++++
 .../tests/BaseClusterIntegrationTest.java          |   8 +-
 .../pinot/integration/tests/ClusterTest.java       |  42 +-
 .../ControllerPeriodicTasksIntegrationTests.java   |   9 +-
 .../tests/HybridClusterIntegrationTest.java        |   2 +-
 ...ridClusterIntegrationTestCommandLineRunner.java |   2 +-
 .../tests/OfflineClusterIntegrationTest.java       |  41 +-
 .../tests/SimpleMinionClusterIntegrationTest.java  |   8 +-
 .../pinot/spi/config/table/IndexingConfig.java     |   9 +
 .../spi/utils/builder/TableConfigBuilder.java      |   7 +
 .../tools/scan/query/RangePredicateFilter.java     |   8 +
 39 files changed, 1596 insertions(+), 56 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/BlockMultiValIterator.java b/pinot-core/src/main/java/org/apache/pinot/core/common/BlockMultiValIterator.java
index 69d5da5..68c0fc5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/BlockMultiValIterator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/BlockMultiValIterator.java
@@ -24,7 +24,7 @@ package org.apache.pinot.core.common;
  */
 public abstract class BlockMultiValIterator implements BlockValIterator {
 
-  public int nextCharVal(int[] charArray) {
+  public int nextCharVal(char[] charArray) {
     throw new UnsupportedOperationException();
   }
 
@@ -44,7 +44,7 @@ public abstract class BlockMultiValIterator implements BlockValIterator {
     throw new UnsupportedOperationException();
   }
 
-  public byte[][] nextBytesArrayVal(byte[][] bytesArrays) {
+  public int nextBytesArrayVal(byte[][] bytesArrays) {
     throw new UnsupportedOperationException();
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/DataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/common/DataSource.java
index 8f33931..492c344 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/DataSource.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/DataSource.java
@@ -55,6 +55,12 @@ public abstract class DataSource extends BaseOperator {
   public abstract InvertedIndexReader getInvertedIndex();
 
   /**
+   * Returns the inverted index for the column if exists, or {@code null} if not.
+   */
+  @Nullable
+  public abstract InvertedIndexReader getRangeIndex();
+
+  /**
    * Returns the bloom filter for the column if exists, or {@code null} if not.
    */
   @Nullable
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index 39d7a30..74af7cc 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -114,6 +114,7 @@ public class MutableSegmentImpl implements MutableSegment {
   private final Map<String, BaseMutableDictionary> _dictionaryMap = new HashMap<>();
   private final Map<String, DataFileReader> _indexReaderWriterMap = new HashMap<>();
   private final Map<String, InvertedIndexReader> _invertedIndexMap = new HashMap<>();
+  private final Map<String, InvertedIndexReader> _rangeIndexMap = new HashMap<>();
   private final Map<String, BloomFilterReader> _bloomFilterMap = new HashMap<>();
   private final Map<String, RealtimeNullValueVectorReaderWriter> _nullValueVectorMap = new HashMap<>();
   private final IdMap<FixedIntArray> _recordIdMap;
@@ -621,11 +622,12 @@ public class MutableSegmentImpl implements MutableSegment {
       DataFileReader forwardIndex = _indexReaderWriterMap.get(column);
       BaseMutableDictionary dictionary = _dictionaryMap.get(column);
       InvertedIndexReader invertedIndex = _invertedIndexMap.get(column);
+      InvertedIndexReader rangeIndex = _rangeIndexMap.get(column);
       BloomFilterReader bloomFilter = _bloomFilterMap.get(column);
       RealtimeNullValueVectorReaderWriter nullValueVector = _nullValueVectorMap.get(column);
       return new MutableDataSource(fieldSpec, _numDocsIndexed, numValuesInfo.getNumValues(),
           numValuesInfo.getMaxNumValuesPerMVEntry(), partitionFunction, partitionId, forwardIndex, dictionary,
-          invertedIndex, bloomFilter, nullValueVector);
+          invertedIndex, rangeIndex, bloomFilter, nullValueVector);
     }
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/DictionaryBasedMultiValueIterator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/DictionaryBasedMultiValueIterator.java
new file mode 100644
index 0000000..9f3dc2a
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/DictionaryBasedMultiValueIterator.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.docvaliterators;
+
+import org.apache.pinot.core.common.BlockMultiValIterator;
+import org.apache.pinot.core.common.BlockSingleValIterator;
+import org.apache.pinot.core.io.reader.ReaderContext;
+import org.apache.pinot.core.io.reader.SingleColumnMultiValueReader;
+import org.apache.pinot.core.io.reader.SingleColumnSingleValueReader;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+
+
+@SuppressWarnings("unchecked")
+public final class DictionaryBasedMultiValueIterator extends BlockMultiValIterator {
+
+  private final SingleColumnMultiValueReader _reader;
+  private final int _numDocs;
+  private final ReaderContext _context;
+  private final Dictionary _dictionary;
+  private final int[] _dictIds;
+
+  private int _nextDocId;
+
+  public DictionaryBasedMultiValueIterator(SingleColumnMultiValueReader reader, Dictionary dictionary, int numDocs,
+      int maxLength) {
+    _reader = reader;
+    _numDocs = numDocs;
+    _context = _reader.createContext();
+    _dictionary = dictionary;
+    _dictIds = new int[maxLength];
+  }
+
+  @Override
+  public int nextIntVal(int[] intArray) {
+    int length = _reader.getIntArray(_nextDocId++, _dictIds, _context);
+    for (int i = 0; i < length; i++) {
+      intArray[i] = _dictionary.getIntValue(_dictIds[i]);
+    }
+    return length;
+  }
+
+  @Override
+  public int nextDoubleVal(double[] doubleArray) {
+    int length = _reader.getIntArray(_nextDocId++, _dictIds, _context);
+    for (int i = 0; i < length; i++) {
+      doubleArray[i] = _dictionary.getDoubleValue(_dictIds[i]);
+    }
+    return length;
+  }
+
+  @Override
+  public int nextFloatVal(float[] floatArray) {
+    int length = _reader.getIntArray(_nextDocId++, _dictIds, _context);
+    for (int i = 0; i < length; i++) {
+      floatArray[i] = _dictionary.getFloatValue(_dictIds[i]);
+    }
+    return length;
+  }
+
+  @Override
+  public int nextLongVal(long[] longArray) {
+    int length = _reader.getIntArray(_nextDocId++, _dictIds, _context);
+    for (int i = 0; i < length; i++) {
+      longArray[i] = _dictionary.getLongValue(_dictIds[i]);
+    }
+    return length;
+  }
+
+  @Override
+  public int nextBytesArrayVal(byte[][] bytesArrays) {
+    int length = _reader.getIntArray(_nextDocId++, _dictIds, _context);
+    for (int i = 0; i < length; i++) {
+      bytesArrays[i] = _dictionary.getBytesValue(_dictIds[i]);
+    }
+    return length;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return _nextDocId < _numDocs;
+  }
+
+  @Override
+  public void skipTo(int docId) {
+    _nextDocId = docId;
+  }
+
+  @Override
+  public void reset() {
+    _nextDocId = 0;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/MultiValueIterator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/DictionaryBasedSingleValueIterator.java
similarity index 52%
copy from pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/MultiValueIterator.java
copy to pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/DictionaryBasedSingleValueIterator.java
index 85ae7c7..34855e0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/MultiValueIterator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/DictionaryBasedSingleValueIterator.java
@@ -18,28 +18,57 @@
  */
 package org.apache.pinot.core.operator.docvaliterators;
 
-import org.apache.pinot.core.common.BlockMultiValIterator;
+import org.apache.pinot.core.common.BlockSingleValIterator;
 import org.apache.pinot.core.io.reader.ReaderContext;
-import org.apache.pinot.core.io.reader.SingleColumnMultiValueReader;
+import org.apache.pinot.core.io.reader.SingleColumnSingleValueReader;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
 
 
 @SuppressWarnings("unchecked")
-public final class MultiValueIterator extends BlockMultiValIterator {
-  private final SingleColumnMultiValueReader _reader;
+public final class DictionaryBasedSingleValueIterator extends BlockSingleValIterator {
+
+  private final SingleColumnSingleValueReader _reader;
   private final int _numDocs;
   private final ReaderContext _context;
+  private final Dictionary _dictionary;
 
   private int _nextDocId;
 
-  public MultiValueIterator(SingleColumnMultiValueReader reader, int numDocs) {
+  public DictionaryBasedSingleValueIterator(SingleColumnSingleValueReader reader, Dictionary dictionary, int numDocs) {
     _reader = reader;
     _numDocs = numDocs;
     _context = _reader.createContext();
+    _dictionary = dictionary;
+  }
+
+  @Override
+  public int nextIntVal() {
+    return _dictionary.getIntValue(_reader.getInt(_nextDocId++, _context));
+  }
+
+  @Override
+  public long nextLongVal() {
+    return _dictionary.getLongValue(_reader.getInt(_nextDocId++, _context));
+  }
+
+  @Override
+  public float nextFloatVal() {
+    return _dictionary.getFloatValue(_reader.getInt(_nextDocId++, _context));
+  }
+
+  @Override
+  public double nextDoubleVal() {
+    return _dictionary.getDoubleValue(_reader.getInt(_nextDocId++, _context));
+  }
+
+  @Override
+  public String nextStringVal() {
+    return _dictionary.getStringValue(_reader.getInt(_nextDocId++, _context));
   }
 
   @Override
-  public int nextIntVal(int[] intArray) {
-    return _reader.getIntArray(_nextDocId++, intArray, _context);
+  public byte[] nextBytesVal() {
+    return _dictionary.getBytesValue(_reader.getInt(_nextDocId++, _context));
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/MultiValueIterator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/MultiValueIterator.java
index 85ae7c7..4529c84 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/MultiValueIterator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvaliterators/MultiValueIterator.java
@@ -43,6 +43,31 @@ public final class MultiValueIterator extends BlockMultiValIterator {
   }
 
   @Override
+  public int nextCharVal(char[] charArray) {
+    return _reader.getCharArray(_nextDocId++, charArray);
+  }
+
+  @Override
+  public int nextDoubleVal(double[] doubleArray) {
+    return _reader.getDoubleArray(_nextDocId++, doubleArray);
+  }
+
+  @Override
+  public int nextFloatVal(float[] floatArray) {
+    return _reader.getFloatArray(_nextDocId++, floatArray);
+  }
+
+  @Override
+  public int nextLongVal(long[] longArray) {
+    return _reader.getLongArray(_nextDocId++, longArray);
+  }
+
+  @Override
+  public int nextBytesArrayVal(byte[][] bytesArrays) {
+     return _reader.getBytesArray(_nextDocId++, bytesArrays);
+  }
+
+  @Override
   public boolean hasNext() {
     return _nextDocId < _numDocs;
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java
index cb5bbb3..ed134b1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/FilterOperatorUtils.java
@@ -54,6 +54,12 @@ public class FilterOperatorUtils {
 
     Predicate.Type predicateType = predicateEvaluator.getPredicateType();
 
+    //Only for dictionary encoded columns and offline data sources
+    if (predicateType == Predicate.Type.RANGE && dataSource.getDictionary() != null
+        && dataSource.getRangeIndex() != null) {
+      return new RangeFilterOperator(predicateEvaluator, dataSource, startDocId, endDocId);
+    }
+
     if (predicateType == Predicate.Type.TEXT_MATCH) {
       return new TextMatchFilterOperator(predicateEvaluator, dataSource, startDocId, endDocId);
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RangeFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RangeFilterOperator.java
new file mode 100644
index 0000000..8398bef
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RangeFilterOperator.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.filter;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.core.common.BlockDocIdIterator;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.common.Predicate;
+import org.apache.pinot.core.operator.blocks.FilterBlock;
+import org.apache.pinot.core.operator.dociditerators.MVScanDocIdIterator;
+import org.apache.pinot.core.operator.dociditerators.SVScanDocIdIterator;
+import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet;
+import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
+import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFactory;
+import org.apache.pinot.core.segment.index.readers.RangeIndexReader;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+public class RangeFilterOperator extends BaseFilterOperator {
+  private static final String OPERATOR_NAME = "RangeFilterOperator";
+
+  private final PredicateEvaluator _predicateEvaluator;
+  private final DataSource _dataSource;
+  private final int _startDocId;
+  private final int _endDocId;
+  private final boolean _exclusive;
+
+  public RangeFilterOperator(PredicateEvaluator predicateEvaluator, DataSource dataSource, int startDocId,
+      int endDocId) {
+    // NOTE:
+    // Predicate that is always evaluated as true or false should not be passed into the BitmapBasedFilterOperator for
+    // performance concern.
+    // If predicate is always evaluated as true, use MatchAllFilterOperator; if predicate is always evaluated as false,
+    // use EmptyFilterOperator.
+    Preconditions.checkArgument(!predicateEvaluator.isAlwaysTrue() && !predicateEvaluator.isAlwaysFalse());
+
+    _predicateEvaluator = predicateEvaluator;
+    _dataSource = dataSource;
+    _startDocId = startDocId;
+    _endDocId = endDocId;
+    _exclusive = predicateEvaluator.isExclusive();
+  }
+
+  @Override
+  protected FilterBlock getNextBlock() {
+
+    //only dictionary based is supported for now
+    Preconditions.checkState(_predicateEvaluator instanceof RangePredicateEvaluatorFactory.OfflineDictionaryBasedRangePredicateEvaluator);
+
+    RangePredicateEvaluatorFactory.OfflineDictionaryBasedRangePredicateEvaluator evaluator =
+        (RangePredicateEvaluatorFactory.OfflineDictionaryBasedRangePredicateEvaluator) _predicateEvaluator;
+
+    RangeIndexReader rangeIndexReader = (RangeIndexReader) _dataSource.getRangeIndex();
+    int startRangeId = rangeIndexReader.findRangeId(evaluator.getStartDictId());
+    int endRangeId = rangeIndexReader.findRangeId(evaluator.getEndDictId());
+    //Handle Matching Ranges - some ranges match fully but some partially
+    //below code assumes first and last range always match partially which may not be the case always //todo: optimize it
+    MutableRoaringBitmap mutableRoaringBitmap = new MutableRoaringBitmap();
+    mutableRoaringBitmap.or(rangeIndexReader.getDocIds(startRangeId));
+    if (endRangeId != startRangeId) {
+      mutableRoaringBitmap.or(rangeIndexReader.getDocIds(endRangeId));
+    }
+    final ScanBasedFilterOperator scanBasedFilterOperator =
+        new ScanBasedFilterOperator(_predicateEvaluator, _dataSource, _startDocId, _endDocId);
+    FilterBlockDocIdSet scanBlockDocIdSet = scanBasedFilterOperator.getNextBlock().getBlockDocIdSet();
+    BlockDocIdIterator iterator = scanBlockDocIdSet.iterator();
+
+    List<ImmutableRoaringBitmap> bitmapList = new ArrayList<>();
+    if (_dataSource.getDataSourceMetadata().isSingleValue()) {
+      bitmapList.add(((SVScanDocIdIterator) iterator).applyAnd(mutableRoaringBitmap));
+    } else {
+      bitmapList.add(((MVScanDocIdIterator) iterator).applyAnd(mutableRoaringBitmap));
+    }
+
+    //All the intermediate ranges will be full match
+    for (int rangeId = startRangeId + 1; rangeId < endRangeId; rangeId++) {
+      bitmapList.add(rangeIndexReader.getDocIds(rangeId));
+    }
+    ImmutableRoaringBitmap[] bitmaps = new ImmutableRoaringBitmap[bitmapList.size()];
+    bitmapList.toArray(bitmaps);
+    return new FilterBlock(new BitmapDocIdSet(bitmaps, _startDocId, _endDocId, _exclusive) {
+
+      @Override
+      public long getNumEntriesScannedInFilter() {
+        return scanBlockDocIdSet.getNumEntriesScannedInFilter();
+      }
+    });
+  }
+
+  @Override
+  public String getOperatorName() {
+    return OPERATOR_NAME;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/PredicateEvaluator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/PredicateEvaluator.java
index a24e492..09c8768 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/PredicateEvaluator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/PredicateEvaluator.java
@@ -22,6 +22,8 @@ import org.apache.pinot.core.common.Predicate;
 
 
 public interface PredicateEvaluator {
+
+
   /**
    * APIs for both dictionary based and raw value based predicate evaluator
    */
@@ -180,4 +182,5 @@ public interface PredicateEvaluator {
    * @return Whether the entry matches the predicate
    */
   boolean applyMV(byte[][] values, int length);
+
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
index 0ddee87..2b1b29f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
@@ -117,7 +117,6 @@ public class FilterPlanNode implements PlanNode {
 
       TransformExpressionTree expression = filterQueryTree.getExpression();
       if (expression.getExpressionType() == TransformExpressionTree.ExpressionType.FUNCTION) {
-
         return new ExpressionFilterOperator(segment, expression, predicate);
       } else {
         DataSource dataSource = segment.getDataSource(filterQueryTree.getColumn());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
index c3316cb..3dc60aa 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
@@ -36,6 +36,7 @@ public class V1Constants {
     public static final String RAW_SV_FORWARD_INDEX_FILE_EXTENSION = ".sv.raw.fwd";
     public static final String UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION = ".mv.fwd";
     public static final String BITMAP_INVERTED_INDEX_FILE_EXTENSION = ".bitmap.inv";
+    public static final String BITMAP_RANGE_INDEX_FILE_EXTENSION = ".bitmap.range";
     public static final String BLOOM_FILTER_FILE_EXTENSION = ".bloom";
     public static final String NULLVALUE_VECTOR_FILE_EXTENSION = ".bitmap.nullvalue";
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/RangeIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/RangeIndexCreator.java
new file mode 100644
index 0000000..ab33ea2
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/RangeIndexCreator.java
@@ -0,0 +1,542 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.creator.impl.inv;
+
+import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.Arrays;
+import it.unimi.dsi.fastutil.Swapper;
+import it.unimi.dsi.fastutil.ints.IntComparator;
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.query.utils.Pair;
+import org.apache.pinot.core.segment.creator.InvertedIndexCreator;
+import org.apache.pinot.core.segment.creator.impl.SegmentColumnarIndexCreator;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.pinot.core.segment.creator.impl.V1Constants.Indexes.BITMAP_RANGE_INDEX_FILE_EXTENSION;
+
+
+/**
+ * Implementation of {@link InvertedIndexCreator} that uses off-heap memory.
+ * <p>We use 2 passes to create the range index.
+ * <ul>
+ *
+ *   <li>
+ *     A
+ *   </li>
+ *   <li>
+ *     In the first pass (adding values phase), when add() method is called, store the raw values into the
+ *     value buffer (for multi-valued column we flatten the values).
+ *     We also store the corresponding docId in docIdBuffer which will be sorted in the next phase based on the value in valueBuffer.
+ *
+ *   </li>
+ *   <li>
+ *     In the second pass (processing values phase), when seal() method is called, we sort the docIdBuffer based on the value in valueBuffer.
+ *     We then iterate over the sorted docIdBuffer and create ranges such that each range comprises of _numDocsPerRange.
+ *     While
+ *   </li>
+ * </ul>
+ */
+public final class RangeIndexCreator implements InvertedIndexCreator {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(RangeIndexCreator.class);
+
+  //This will dump the content of temp buffers and ranges
+  private static final boolean TRACE = false;
+
+  private static final int RANGE_INDEX_VERSION = 1;
+
+  private static final int DEFAULT_NUM_RANGES = 20;
+
+  private static final String VALUE_BUFFER_SUFFIX = "val.buf";
+  private static final String DOC_ID_VALUE_BUFFER_SUFFIX = ".doc.id.buf";
+
+  //output file which will hold the range index
+  private final File _rangeIndexFile;
+
+  //File where the input values will be stored. This is a temp file that will be deleted at the end
+  private final File _tempValueBufferFile;
+  //pinot data buffer MMapped - maps the content of _tempValueBufferFile
+  private PinotDataBuffer _tempValueBuffer;
+  //a simple wrapper over _tempValueBuffer to make it easy to read/write any Number (INT,LONG, FLOAT, DOUBLE)
+  private NumberValueBuffer _numberValueBuffer;
+
+  //File where the docId will be stored. Temp file that will be deleted at the end
+  private final File _tempDocIdBufferFile;
+  //pinot data buffer MMapped - maps the content of _tempDocIdBufferFile
+  private PinotDataBuffer _docIdValueBuffer;
+  //a simple wrapper over _docIdValueBuffer to make it easy to read/write any INT
+  private IntValueBuffer _docIdBuffer;
+
+  private final int _numValues;
+  private int _nextDocId;
+  private int _nextValueId;
+  private int _numDocsPerRange;
+  private FieldSpec.DataType _valueType;
+
+  /**
+   *
+   * @param indexDir destination of the range index file
+   * @param fieldSpec fieldspec of the column to generate the range index
+   * @param valueType DataType of the column, INT if dictionary encoded, or INT, FLOAT, LONG, DOUBLE for raw encoded
+   * @param numRanges customize the number of ranges, if -1, we use DEFAULT_NUM_RANGES;
+   * @param numDocsPerRange customize the number of Docs Per Range, if -1 numDocsPerRange = totalValues/numRanges
+   * @param numDocs total number of documents
+   * @param numValues total number of values, used for Multi value columns (for single value columns numDocs== numValues)
+   * @throws IOException
+   */
+  public RangeIndexCreator(File indexDir, FieldSpec fieldSpec, FieldSpec.DataType valueType, int numRanges,
+      int numDocsPerRange, int numDocs, int numValues)
+      throws IOException {
+    _valueType = valueType;
+    String columnName = fieldSpec.getName();
+    _rangeIndexFile = new File(indexDir, columnName + BITMAP_RANGE_INDEX_FILE_EXTENSION);
+    _tempValueBufferFile = new File(indexDir, columnName + VALUE_BUFFER_SUFFIX);
+    _tempDocIdBufferFile = new File(indexDir, columnName + DOC_ID_VALUE_BUFFER_SUFFIX);
+    _numValues = fieldSpec.isSingleValueField() ? numDocs : numValues;
+    int valueSize = valueType.size();
+    try {
+      //use DEFAULT_NUM_RANGES if numRanges is not set
+      if (numRanges < 0) {
+        numRanges = DEFAULT_NUM_RANGES;
+      }
+      if (numDocsPerRange < 0) {
+        _numDocsPerRange = (int) Math.ceil(_numValues / numRanges);
+      }
+
+      //Value buffer to store the values added via add method
+      _tempValueBuffer = createTempBuffer((long) _numValues * valueSize, _tempValueBufferFile);
+
+      switch (_valueType) {
+        case INT:
+          _numberValueBuffer = new IntValueBuffer(_tempValueBuffer);
+          break;
+        case FLOAT:
+          _numberValueBuffer = new FloatValueBuffer(_tempValueBuffer);
+          break;
+        case LONG:
+          _numberValueBuffer = new LongValueBuffer(_tempValueBuffer);
+          break;
+        case DOUBLE:
+          _numberValueBuffer = new DoubleValueBuffer(_tempValueBuffer);
+          break;
+        default:
+          throw new UnsupportedOperationException("Range index is not supported for columns of data type:" + valueType);
+      }
+
+      //docId  Buffer
+      _docIdValueBuffer = createTempBuffer((long) _numValues * Integer.BYTES, _tempDocIdBufferFile);
+      _docIdBuffer = new IntValueBuffer(_docIdValueBuffer);
+    } catch (Exception e) {
+      destroyBuffer(_tempValueBuffer, _tempValueBufferFile);
+      destroyBuffer(_tempValueBuffer, _tempDocIdBufferFile);
+      throw e;
+    }
+  }
+
+  @Override
+  public void add(int dictId) {
+    _numberValueBuffer.put(_nextDocId, dictId);
+    _docIdBuffer.put(_nextDocId, _nextDocId);
+    _nextDocId = _nextDocId + 1;
+  }
+
+  @Override
+  public void add(int[] dictIds, int length) {
+    for (int i = 0; i < length; i++) {
+      int dictId = dictIds[i];
+      _numberValueBuffer.put(_nextValueId, dictId);
+      _docIdBuffer.put(_nextValueId, _nextDocId);
+      _nextValueId = _nextValueId + 1;
+    }
+    _nextDocId = _nextDocId + 1;
+  }
+
+  @Override
+  public void addDoc(Object document, int docIdCounter) {
+    throw new IllegalStateException("Range index creator does not support Object type currently");
+  }
+
+  /**
+   * Generates the range Index file
+   * Sample output by running RangeIndexCreatorTest with TRACE=true and change log4.xml in core to info
+   * 15:18:47.330 RangeIndexCreator - Before sorting
+   * 15:18:47.333 RangeIndexCreator - DocIdBuffer  [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, ]
+   * 15:18:47.333 RangeIndexCreator - ValueBuffer  [ 3, 0, 0, 0, 3, 1, 3, 0, 2, 4, 4, 2, 4, 3, 2, 1, 0, 2, 0, 3, ]
+   * 15:18:47.371 RangeIndexCreator - After sorting
+   * 15:18:47.371 RangeIndexCreator - DocIdBuffer  [ 16, 3, 1, 2, 7, 18, 15, 5, 14, 8, 17, 11, 0, 4, 6, 13, 19, 10, 9, 12, ]
+   * 15:18:47.371 RangeIndexCreator - ValueBuffer  [ 0, 0, 0, 0, 0, 0, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 4, 4, 4, ]
+   * 15:18:47.372 RangeIndexCreator - rangeOffsets = [ (0,5) ,(6,7) ,(8,11) ,(12,16) ,(17,19) , ]
+   * 15:18:47.372 RangeIndexCreator - rangeValues = [ (0,0) ,(1,1) ,(2,2) ,(3,3) ,(4,4) , ]
+   *
+   * @throws IOException
+   */
+  @Override
+  public void seal()
+      throws IOException {
+    if (TRACE) {
+      LOGGER.info("Before sorting");
+      dump();
+    }
+
+    //Sorts the value buffer while maintaining the mapping with the docId.
+    //The mapping is needed  in the subsequent phase where we generate the bitmap for each range.
+    IntComparator comparator = (i, j) -> {
+      Number val1 = _numberValueBuffer.get(i);
+      Number val2 = _numberValueBuffer.get(j);
+      return _numberValueBuffer.compare(val1, val2);
+    };
+    Swapper swapper = (i, j) -> {
+      Number temp = _docIdBuffer.get(i).intValue();
+      _docIdBuffer.put(i, _docIdBuffer.get(j).intValue());
+      _docIdBuffer.put(j, temp);
+
+      Number tempValue = _numberValueBuffer.get(i);
+      _numberValueBuffer.put(i, _numberValueBuffer.get(j));
+      _numberValueBuffer.put(j, tempValue);
+    };
+    Arrays.quickSort(0, _numValues, comparator, swapper);
+
+    if (TRACE) {
+      LOGGER.info("After sorting");
+      dump();
+    }
+    //FIND THE RANGES
+    //go over the sorted value to compute ranges
+    List<Pair<Integer, Integer>> ranges = new ArrayList<>();
+
+    int boundary = _numDocsPerRange;
+    int start = 0;
+    for (int i = 0; i < _numValues; i++) {
+      if (i > start + boundary) {
+        if (comparator.compare(i, i - 1) != 0) {
+          ranges.add(new Pair(start, i - 1));
+          start = i;
+        }
+      }
+    }
+    ranges.add(new Pair(start, _numValues - 1));
+
+    //Dump ranges
+    if (TRACE) {
+      dumpRanges(ranges);
+    }
+    // RANGE INDEX FILE LAYOUT
+    //HEADER
+    //   # VERSION (INT)
+    //   # DATA_TYPE (String -> INT (length) (ACTUAL BYTES)
+    //   # Number OF RANGES (INT)
+    //   <RANGE START VALUE BUFFER> # (R + 1 )* ValueSize
+    //   Range Start 0,
+    //    .........
+    //   Range Start R - 1
+    //   Range MAX VALUE
+    //   Bitmap for Range 0 Start Offset
+    //      .....
+    //   Bitmap for Range R Start Offset
+    //BODY
+    //   Bitmap for range 0
+    //   Bitmap for range 2
+    //    ......
+    //   Bitmap for range R - 1
+    long bytesWritten = 0;
+    try (BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(_rangeIndexFile));
+        DataOutputStream header = new DataOutputStream(bos);
+        FileOutputStream fos = new FileOutputStream(_rangeIndexFile);
+        DataOutputStream dataOutputStream = new DataOutputStream(new BufferedOutputStream(fos))) {
+
+      //VERSION
+      header.writeInt(RANGE_INDEX_VERSION);
+
+      bytesWritten += Integer.BYTES;
+
+      //value data type
+      byte[] valueDataTypeBytes = _valueType.name().getBytes(UTF_8);
+      header.writeInt(valueDataTypeBytes.length);
+      bytesWritten += Integer.BYTES;
+
+      header.write(valueDataTypeBytes);
+      bytesWritten += valueDataTypeBytes.length;
+
+      //Write the number of ranges
+      header.writeInt(ranges.size());
+      bytesWritten += Integer.BYTES;
+
+      //write the range start values
+      for (Pair<Integer, Integer> range : ranges) {
+        Number rangeStart = _numberValueBuffer.get(range.getFirst());
+        writeNumberToHeader(header, rangeStart);
+      }
+      bytesWritten += ranges.size() * _valueType.size(); // Range start values
+
+      Number lastRangeEnd = _numberValueBuffer.get(ranges.get(ranges.size() - 1).getSecond());
+      writeNumberToHeader(header, lastRangeEnd);
+      bytesWritten += _valueType.size(); // Last range end value
+
+      //compute the offset where the bitmap for the first range would be written
+      //bitmap start offset for each range, one extra to make it easy to get the length for last one.
+      long bitmapOffsetHeaderSize = (ranges.size() + 1) * Long.BYTES;
+
+      long bitmapOffset = bytesWritten + bitmapOffsetHeaderSize;
+      header.writeLong(bitmapOffset);
+      bytesWritten += Long.BYTES;
+      fos.getChannel().position(bitmapOffset);
+
+      for (int i = 0; i < ranges.size(); i++) {
+        Pair<Integer, Integer> range = ranges.get(i);
+        MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
+        for (int index = range.getFirst(); index <= range.getSecond(); index++) {
+          bitmap.add(_docIdBuffer.get(index).intValue());
+        }
+        // Write offset and bitmap into file
+        int sizeInBytes = bitmap.serializedSizeInBytes();
+        bitmapOffset += sizeInBytes;
+
+        // Check for int overflow
+        Preconditions.checkState(bitmapOffset > 0, "Inverted index file: %s exceeds 2GB limit", _rangeIndexFile);
+
+        header.writeLong(bitmapOffset);
+        bytesWritten += Long.BYTES;
+
+        byte[] bytes = new byte[sizeInBytes];
+        bitmap.serialize(ByteBuffer.wrap(bytes));
+        dataOutputStream.write(bytes);
+        bytesWritten += bytes.length;
+      }
+    } catch (IOException e) {
+      FileUtils.deleteQuietly(_rangeIndexFile);
+      throw e;
+    }
+    Preconditions.checkState(bytesWritten == _rangeIndexFile.length(),
+        "Length of inverted index file: " + _rangeIndexFile.length() + " does not match the number of bytes written: "
+            + bytesWritten);
+  }
+
+  private void writeNumberToHeader(DataOutputStream header, Number number)
+      throws IOException {
+    switch (_valueType) {
+      case INT:
+        header.writeInt(number.intValue());
+        break;
+      case LONG:
+        header.writeLong(number.longValue());
+        break;
+      case FLOAT:
+        header.writeFloat(number.floatValue());
+        break;
+      case DOUBLE:
+        header.writeDouble(number.doubleValue());
+        break;
+      default:
+        throw new RuntimeException("Range index not supported for dataType: " + _valueType);
+    }
+  }
+
+  private void dumpRanges(List<Pair<Integer, Integer>> ranges) {
+    StringBuilder rangeOffsets = new StringBuilder("[ ");
+    StringBuilder rangeValues = new StringBuilder("[ ");
+    for (Pair<Integer, Integer> range : ranges) {
+      rangeOffsets.append("(").append(range.getFirst()).append(",").append(range.getSecond()).append(") ,");
+      rangeValues.append("(").append(_numberValueBuffer.get(range.getFirst())).append(",")
+          .append(_numberValueBuffer.get(range.getSecond())).append(") ,");
+    }
+    rangeOffsets.append(" ]");
+    rangeValues.append(" ]");
+    LOGGER.info("rangeOffsets = " + rangeOffsets);
+    LOGGER.info("rangeValues = " + rangeValues);
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    org.apache.pinot.common.utils.FileUtils.close(new DataBufferAndFile(_tempValueBuffer, _tempValueBufferFile),
+        new DataBufferAndFile(_docIdValueBuffer, _tempDocIdBufferFile));
+  }
+
+  private class DataBufferAndFile implements Closeable {
+    private final PinotDataBuffer _dataBuffer;
+    private final File _file;
+
+    DataBufferAndFile(final PinotDataBuffer buffer, final File file) {
+      _dataBuffer = buffer;
+      _file = file;
+    }
+
+    @Override
+    public void close()
+        throws IOException {
+      destroyBuffer(_dataBuffer, _file);
+    }
+  }
+
+  void dump() {
+    StringBuilder docIdAsString = new StringBuilder("DocIdBuffer  [ ");
+    for (int i = 0; i < _numValues; i++) {
+      docIdAsString.append(_docIdBuffer.get(i) + ", ");
+    }
+    docIdAsString.append("]");
+    LOGGER.info(docIdAsString.toString());
+
+    StringBuilder valuesAsString = new StringBuilder("ValueBuffer  [ ");
+    for (int i = 0; i < _numValues; i++) {
+      valuesAsString.append(_numberValueBuffer.get(i) + ", ");
+    }
+    valuesAsString.append("] ");
+    LOGGER.info(valuesAsString.toString());
+
+  }
+
+  private PinotDataBuffer createTempBuffer(long size, File mmapFile)
+      throws IOException {
+    return PinotDataBuffer
+        .mapFile(mmapFile, false, 0, size, PinotDataBuffer.NATIVE_ORDER, "RangeIndexCreator: temp buffer");
+  }
+
+  private void destroyBuffer(PinotDataBuffer buffer, File mmapFile)
+      throws IOException {
+    if (buffer != null) {
+      buffer.close();
+      if (mmapFile.exists()) {
+        FileUtils.forceDelete(mmapFile);
+      }
+    }
+  }
+
+  interface NumberValueBuffer {
+
+    void put(int position, Number value);
+
+    Number get(int position);
+
+    int compare(Number val1, Number val2);
+  }
+
+  class IntValueBuffer implements NumberValueBuffer {
+
+    private PinotDataBuffer _buffer;
+
+    IntValueBuffer(PinotDataBuffer buffer) {
+
+      _buffer = buffer;
+    }
+
+    @Override
+    public void put(int position, Number value) {
+      _buffer.putInt(position << 2, value.intValue());
+    }
+
+    @Override
+    public Number get(int position) {
+      return _buffer.getInt(position << 2);
+    }
+
+    @Override
+    public int compare(Number val1, Number val2) {
+      return Integer.compare(val1.intValue(), val2.intValue());
+    }
+  }
+
+  class LongValueBuffer implements NumberValueBuffer {
+
+    private PinotDataBuffer _buffer;
+
+    LongValueBuffer(PinotDataBuffer buffer) {
+
+      _buffer = buffer;
+    }
+
+    @Override
+    public void put(int position, Number value) {
+      _buffer.putInt(position << 3, value.intValue());
+    }
+
+    @Override
+    public Number get(int position) {
+      return _buffer.getInt(position << 3);
+    }
+
+    @Override
+    public int compare(Number val1, Number val2) {
+      return Long.compare(val1.longValue(), val2.longValue());
+    }
+  }
+
+  class FloatValueBuffer implements NumberValueBuffer {
+
+    private PinotDataBuffer _buffer;
+
+    FloatValueBuffer(PinotDataBuffer buffer) {
+
+      _buffer = buffer;
+    }
+
+    @Override
+    public void put(int position, Number value) {
+      _buffer.putInt(position << 2, value.intValue());
+    }
+
+    @Override
+    public Number get(int position) {
+      return _buffer.getInt(position << 2);
+    }
+
+    @Override
+    public int compare(Number val1, Number val2) {
+      return Long.compare(val1.longValue(), val2.longValue());
+    }
+  }
+
+  class DoubleValueBuffer implements NumberValueBuffer {
+
+    private PinotDataBuffer _buffer;
+
+    DoubleValueBuffer(PinotDataBuffer buffer) {
+
+      _buffer = buffer;
+    }
+
+    @Override
+    public void put(int position, Number value) {
+      _buffer.putInt(position << 3, value.intValue());
+    }
+
+    @Override
+    public Number get(int position) {
+      return _buffer.getInt(position << 3);
+    }
+
+    @Override
+    public int compare(Number val1, Number val2) {
+      return Long.compare(val1.longValue(), val2.longValue());
+    }
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/ColumnIndexContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/ColumnIndexContainer.java
index 531c3fc..ab9fbfc 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/ColumnIndexContainer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/ColumnIndexContainer.java
@@ -41,6 +41,11 @@ public interface ColumnIndexContainer {
   InvertedIndexReader getInvertedIndex();
 
   /**
+   * Returns the inverted index for the column, or {@code null} if it does not exist.
+   */
+  InvertedIndexReader getRangeIndex();
+
+  /**
    * Returns the dictionary for the column, or {@code null} if it does not exist.
    */
   Dictionary getDictionary();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
index aff965d..eb33185 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/column/PhysicalColumnIndexContainer.java
@@ -33,6 +33,7 @@ import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.core.segment.index.metadata.ColumnMetadata;
 import org.apache.pinot.core.segment.index.readers.BaseImmutableDictionary;
 import org.apache.pinot.core.segment.index.readers.BitmapInvertedIndexReader;
+import org.apache.pinot.core.segment.index.readers.RangeIndexReader;
 import org.apache.pinot.core.segment.index.readers.BloomFilterReader;
 import org.apache.pinot.core.segment.index.readers.BytesDictionary;
 import org.apache.pinot.core.segment.index.readers.DoubleDictionary;
@@ -61,6 +62,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
 
   private final DataFileReader _forwardIndex;
   private final InvertedIndexReader _invertedIndex;
+  private final InvertedIndexReader _rangeIndex;
   private final BaseImmutableDictionary _dictionary;
   private final BloomFilterReader _bloomFilterReader;
   private final NullValueVectorReaderImpl _nullValueVectorReader;
@@ -70,11 +72,13 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
       throws IOException {
     String columnName = metadata.getColumnName();
     boolean loadInvertedIndex = false;
+    boolean loadRangeIndex = false;
     boolean loadTextIndex = false;
     boolean loadOnHeapDictionary = false;
     boolean loadBloomFilter = false;
     if (indexLoadingConfig != null) {
       loadInvertedIndex = indexLoadingConfig.getInvertedIndexColumns().contains(columnName);
+      loadRangeIndex = indexLoadingConfig.getRangeIndexColumns().contains(columnName);
       loadOnHeapDictionary = indexLoadingConfig.getOnHeapDictionaryColumns().contains(columnName);
       loadBloomFilter = indexLoadingConfig.getBloomFilterColumns().contains(columnName);
       loadTextIndex = indexLoadingConfig.getTextIndexColumns().contains(columnName);
@@ -108,6 +112,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
               new SortedIndexReaderImpl(fwdIndexBuffer, metadata.getCardinality());
           _forwardIndex = sortedIndexReader;
           _invertedIndex = sortedIndexReader;
+          _rangeIndex = null;
           return;
         } else {
           // Unsorted
@@ -124,14 +129,21 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
         _invertedIndex =
             new BitmapInvertedIndexReader(segmentReader.getIndexFor(columnName, ColumnIndexType.INVERTED_INDEX),
                 metadata.getCardinality());
+        _rangeIndex = null;
+      } else if (loadRangeIndex) {
+        _invertedIndex = null;
+        _rangeIndex =
+            new RangeIndexReader(segmentReader.getIndexFor(columnName, ColumnIndexType.RANGE_INDEX));
       } else {
         _invertedIndex = null;
+        _rangeIndex = null;
       }
     } else {
       // Raw index
       _forwardIndex = loadRawForwardIndex(fwdIndexBuffer, metadata.getDataType());
       _dictionary = null;
       _bloomFilterReader = null;
+      _rangeIndex = null;
       if (loadTextIndex) {
         Map<String, Map<String, String>> columnProperties = indexLoadingConfig.getColumnProperties();
         _invertedIndex = new LuceneTextIndexReader(columnName, segmentIndexDir, metadata.getTotalDocs(),
@@ -153,6 +165,11 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
   }
 
   @Override
+  public InvertedIndexReader getRangeIndex() {
+    return _rangeIndex;
+  }
+
+  @Override
   public BaseImmutableDictionary getDictionary() {
     return _dictionary;
   }
@@ -167,7 +184,8 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
     return _nullValueVectorReader;
   }
 
-  private static BaseImmutableDictionary loadDictionary(PinotDataBuffer dictionaryBuffer, ColumnMetadata metadata,
+  //TODO: move this to a DictionaryLoader class
+  public static BaseImmutableDictionary loadDictionary(PinotDataBuffer dictionaryBuffer, ColumnMetadata metadata,
       boolean loadOnHeap) {
     FieldSpec.DataType dataType = metadata.getDataType();
     if (loadOnHeap) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/BaseDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/BaseDataSource.java
index 4fc99cb..fc32487 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/BaseDataSource.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/BaseDataSource.java
@@ -39,17 +39,20 @@ public abstract class BaseDataSource extends DataSource {
   private final DataFileReader _forwardIndex;
   private final Dictionary _dictionary;
   private final InvertedIndexReader _invertedIndex;
+  private final InvertedIndexReader _rangeIndex;
   private final BloomFilterReader _bloomFilter;
   private final NullValueVectorReader _nullValueVector;
   private final String _operatorName;
 
   public BaseDataSource(DataSourceMetadata dataSourceMetadata, DataFileReader forwardIndex,
       @Nullable Dictionary dictionary, @Nullable InvertedIndexReader invertedIndex,
-      @Nullable BloomFilterReader bloomFilter, @Nullable NullValueVectorReader nullValueVector, String operatorName) {
+      @Nullable InvertedIndexReader rangeIndex, @Nullable BloomFilterReader bloomFilter,
+      @Nullable NullValueVectorReader nullValueVector, String operatorName) {
     _dataSourceMetadata = dataSourceMetadata;
     _forwardIndex = forwardIndex;
     _dictionary = dictionary;
     _invertedIndex = invertedIndex;
+    _rangeIndex = rangeIndex;
     _bloomFilter = bloomFilter;
     _nullValueVector = nullValueVector;
     _operatorName = operatorName;
@@ -79,6 +82,12 @@ public abstract class BaseDataSource extends DataSource {
 
   @Nullable
   @Override
+  public InvertedIndexReader getRangeIndex() {
+    return _rangeIndex;
+  }
+
+  @Nullable
+  @Override
   public BloomFilterReader getBloomFilter() {
     return _bloomFilter;
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/ImmutableDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/ImmutableDataSource.java
index a7009a8..b4b0f5a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/ImmutableDataSource.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/ImmutableDataSource.java
@@ -35,7 +35,7 @@ public class ImmutableDataSource extends BaseDataSource {
 
   public ImmutableDataSource(ColumnMetadata columnMetadata, ColumnIndexContainer columnIndexContainer) {
     super(new ImmutableDataSourceMetadata(columnMetadata), columnIndexContainer.getForwardIndex(),
-        columnIndexContainer.getDictionary(), columnIndexContainer.getInvertedIndex(),
+        columnIndexContainer.getDictionary(), columnIndexContainer.getInvertedIndex(), columnIndexContainer.getRangeIndex(),
         columnIndexContainer.getBloomFilter(), columnIndexContainer.getNullValueVector(),
         OPERATOR_NAME_PREFIX + columnMetadata.getColumnName());
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
index cf8e8bb..70610da 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/MutableDataSource.java
@@ -40,10 +40,10 @@ public class MutableDataSource extends BaseDataSource {
 
   public MutableDataSource(FieldSpec fieldSpec, int numDocs, int numValues, int maxNumValuesPerMVEntry,
       @Nullable PartitionFunction partitionFunction, int partitionId, DataFileReader forwardIndex,
-      @Nullable Dictionary dictionary, @Nullable InvertedIndexReader invertedIndex,
+      @Nullable Dictionary dictionary, @Nullable InvertedIndexReader invertedIndex, @Nullable InvertedIndexReader rangeIndex,
       @Nullable BloomFilterReader bloomFilter, @Nullable NullValueVectorReader nullValueVector) {
     super(new MutableDataSourceMetadata(fieldSpec, numDocs, numValues, maxNumValuesPerMVEntry, partitionFunction,
-            partitionId), forwardIndex, dictionary, invertedIndex, bloomFilter, nullValueVector,
+            partitionId), forwardIndex, dictionary, invertedIndex, rangeIndex, bloomFilter, nullValueVector,
         OPERATOR_NAME_PREFIX + fieldSpec.getName());
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
index 8b85d73..6dac2ec 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java
@@ -46,6 +46,7 @@ public class IndexLoadingConfig {
   private List<String> _sortedColumns = Collections.emptyList();
   private Set<String> _invertedIndexColumns = new HashSet<>();
   private Set<String> _textIndexColumns = new HashSet<>();
+  private Set<String> _rangeIndexColumns = new HashSet<>();
   private Set<String> _noDictionaryColumns = new HashSet<>(); // TODO: replace this by _noDictionaryConfig.
   private Map<String, String> _noDictionaryConfig = new HashMap<>();
   private Set<String> _varLengthDictionaryColumns = new HashSet<>();
@@ -86,6 +87,11 @@ public class IndexLoadingConfig {
       _invertedIndexColumns.addAll(invertedIndexColumns);
     }
 
+    List<String> rangeIndexColumns = indexingConfig.getRangeIndexColumns();
+    if (rangeIndexColumns != null) {
+      _rangeIndexColumns.addAll(rangeIndexColumns);
+    }
+
     List<String> bloomFilterColumns = indexingConfig.getBloomFilterColumns();
     if (bloomFilterColumns != null) {
       _bloomFilterColumns.addAll(bloomFilterColumns);
@@ -204,6 +210,10 @@ public class IndexLoadingConfig {
   @Nonnull
   public Set<String> getInvertedIndexColumns() {
     return _invertedIndexColumns;
+  }  @Nonnull
+
+  public Set<String> getRangeIndexColumns() {
+    return _rangeIndexColumns;
   }
 
   @Nonnull
@@ -235,6 +245,14 @@ public class IndexLoadingConfig {
   }
 
   /**
+   * For tests only.
+   */
+  @VisibleForTesting
+  public void setRangeIndexColumns(@Nonnull Set<String> rangeIndexColumns) {
+    _rangeIndexColumns = rangeIndexColumns;
+  }
+
+  /**
    * Used directly from text search unit test code since the test code
    * doesn't really have a table config and is directly testing the
    * query execution code of text search using data from generated segments
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java
index 564321b..89432ab 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessor.java
@@ -31,6 +31,7 @@ import org.apache.pinot.core.segment.index.loader.columnminmaxvalue.ColumnMinMax
 import org.apache.pinot.core.segment.index.loader.defaultcolumn.DefaultColumnHandler;
 import org.apache.pinot.core.segment.index.loader.defaultcolumn.DefaultColumnHandlerFactory;
 import org.apache.pinot.core.segment.index.loader.invertedindex.InvertedIndexHandler;
+import org.apache.pinot.core.segment.index.loader.invertedindex.RangeIndexHandler;
 import org.apache.pinot.core.segment.index.loader.invertedindex.TextIndexHandler;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.core.segment.store.SegmentDirectory;
@@ -99,6 +100,11 @@ public class SegmentPreProcessor implements AutoCloseable {
           new InvertedIndexHandler(_indexDir, _segmentMetadata, _indexLoadingConfig, segmentWriter);
       invertedIndexHandler.createInvertedIndices();
 
+      // Create column inverted indices according to the index config.
+      RangeIndexHandler rangeIndexHandler =
+          new RangeIndexHandler(_indexDir, _segmentMetadata, _indexLoadingConfig, segmentWriter);
+      rangeIndexHandler.createRangeIndices();
+
       Set<String> textIndexColumns = _indexLoadingConfig.getTextIndexColumns();
       if (textIndexColumns.size() > 0) {
         TextIndexHandler textIndexHandler =
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/RangeIndexHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/RangeIndexHandler.java
new file mode 100644
index 0000000..e7685a7
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/RangeIndexHandler.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.loader.invertedindex;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import javax.annotation.Nonnull;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
+import org.apache.pinot.core.io.reader.DataFileReader;
+import org.apache.pinot.core.io.reader.SingleColumnMultiValueReader;
+import org.apache.pinot.core.io.reader.impl.v1.FixedBitMultiValueReader;
+import org.apache.pinot.core.io.reader.impl.v1.FixedBitSingleValueReader;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.segment.creator.impl.inv.RangeIndexCreator;
+import org.apache.pinot.core.segment.index.column.PhysicalColumnIndexContainer;
+import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.core.segment.index.loader.LoaderUtils;
+import org.apache.pinot.core.segment.index.metadata.ColumnMetadata;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.core.segment.index.readers.BaseImmutableDictionary;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.core.segment.store.ColumnIndexType;
+import org.apache.pinot.core.segment.store.SegmentDirectory;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RangeIndexHandler {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RangeIndexHandler.class);
+
+  private final File _indexDir;
+  private final SegmentDirectory.Writer _segmentWriter;
+  private final String _segmentName;
+  private final SegmentVersion _segmentVersion;
+  private final Set<ColumnMetadata> _rangeIndexColumns = new HashSet<>();
+
+  public RangeIndexHandler(@Nonnull File indexDir, @Nonnull SegmentMetadataImpl segmentMetadata,
+      @Nonnull IndexLoadingConfig indexLoadingConfig, @Nonnull SegmentDirectory.Writer segmentWriter) {
+    _indexDir = indexDir;
+    _segmentWriter = segmentWriter;
+    _segmentName = segmentMetadata.getName();
+    _segmentVersion = SegmentVersion.valueOf(segmentMetadata.getVersion());
+
+    // Do not create inverted index for sorted column
+    for (String column : indexLoadingConfig.getRangeIndexColumns()) {
+      ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column);
+      if (columnMetadata != null && !columnMetadata.isSorted()) {
+        _rangeIndexColumns.add(columnMetadata);
+      }
+    }
+  }
+
+  public void createRangeIndices()
+      throws IOException {
+    for (ColumnMetadata columnMetadata : _rangeIndexColumns) {
+      createRangeIndexForColumn(columnMetadata);
+    }
+  }
+
+  private void createRangeIndexForColumn(ColumnMetadata columnMetadata)
+      throws IOException {
+    //Range index supported only for dictionary encoded columns for now
+    if (!columnMetadata.hasDictionary()) {
+      LOGGER.warn("Skipping creation of Range index for column:{}. It's only supported for dictionary encoded columns",
+          columnMetadata.getColumnName());
+
+      return;
+    }
+    String column = columnMetadata.getColumnName();
+
+    File inProgress = new File(_indexDir, column + ".range.inprogress");
+    File rangeIndexFile = new File(_indexDir, column + V1Constants.Indexes.BITMAP_RANGE_INDEX_FILE_EXTENSION);
+
+    if (!inProgress.exists()) {
+      // Marker file does not exist, which means last run ended normally.
+
+      if (_segmentWriter.hasIndexFor(column, ColumnIndexType.RANGE_INDEX)) {
+        // Skip creating range index if already exists.
+
+        LOGGER.info("Found Range index for segment: {}, column: {}", _segmentName, column);
+        return;
+      }
+
+      // Create a marker file.
+      FileUtils.touch(inProgress);
+    } else {
+      // Marker file exists, which means last run gets interrupted.
+      // Remove inverted index if exists.
+      // For v1 and v2, it's the actual inverted index. For v3, it's the temporary inverted index.
+      FileUtils.deleteQuietly(rangeIndexFile);
+    }
+
+    // Create new inverted index for the column.
+    LOGGER.info("Creating new range index for segment: {}, column: {}", _segmentName, column);
+    int numDocs = columnMetadata.getTotalDocs();
+
+    PinotDataBuffer dictBuffer = _segmentWriter.getIndexFor(columnMetadata.getColumnName(), ColumnIndexType.DICTIONARY);
+    BaseImmutableDictionary dictionary = PhysicalColumnIndexContainer.loadDictionary(dictBuffer, columnMetadata, false);
+    handleDictionaryBasedColumn(dictionary, columnMetadata, numDocs);
+
+    // For v3, write the generated inverted index file into the single file and remove it.
+    if (_segmentVersion == SegmentVersion.v3) {
+      LoaderUtils.writeIndexToV3Format(_segmentWriter, column, rangeIndexFile, ColumnIndexType.RANGE_INDEX);
+    }
+
+    // Delete the marker file.
+    FileUtils.deleteQuietly(inProgress);
+
+    LOGGER.info("Created inverted index for segment: {}, column: {}", _segmentName, column);
+  }
+
+  private void handleDictionaryBasedColumn(BaseImmutableDictionary dictionary, ColumnMetadata columnMetadata,
+      int numDocs)
+      throws IOException {
+    try (RangeIndexCreator creator = new RangeIndexCreator(_indexDir, columnMetadata.getFieldSpec(),
+        FieldSpec.DataType.INT, -1, -1, numDocs, columnMetadata.getTotalNumberOfEntries())) {
+      try (DataFileReader fwdIndex = getForwardIndexReader(columnMetadata, _segmentWriter)) {
+        if (columnMetadata.isSingleValue()) {
+          // Single-value column.
+          FixedBitSingleValueReader svFwdIndex = (FixedBitSingleValueReader) fwdIndex;
+          for (int i = 0; i < numDocs; i++) {
+            creator.add(svFwdIndex.getInt(i));
+          }
+        } else {
+          // Multi-value column.
+          SingleColumnMultiValueReader mvFwdIndex = (SingleColumnMultiValueReader) fwdIndex;
+          int[] dictIds = new int[columnMetadata.getMaxNumberOfMultiValues()];
+          for (int i = 0; i < numDocs; i++) {
+            int length = mvFwdIndex.getIntArray(i, dictIds);
+            creator.add(dictIds, length);
+          }
+        }
+        creator.seal();
+      }
+    }
+  }
+
+  private DataFileReader getForwardIndexReader(ColumnMetadata columnMetadata, SegmentDirectory.Writer segmentWriter)
+      throws IOException {
+    PinotDataBuffer buffer = segmentWriter.getIndexFor(columnMetadata.getColumnName(), ColumnIndexType.FORWARD_INDEX);
+    int numRows = columnMetadata.getTotalDocs();
+    int numBitsPerValue = columnMetadata.getBitsPerElement();
+    if (columnMetadata.isSingleValue()) {
+      return new FixedBitSingleValueReader(buffer, numRows, numBitsPerValue);
+    } else {
+      return new FixedBitMultiValueReader(buffer, numRows, columnMetadata.getTotalNumberOfEntries(), numBitsPerValue);
+    }
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java
index 8ee589a..d01406c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadata.java
@@ -96,6 +96,8 @@ public interface SegmentMetadata {
 
   String getBitmapInvertedIndexFileName(String column);
 
+  String getBitmapRangeIndexFileName(String column);
+
   String getBloomFilterFileName(String column);
 
   String getNullValueVectorFileName(String column);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java
index 5b58f9e..3bbdcdd 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java
@@ -452,6 +452,11 @@ public class SegmentMetadataImpl implements SegmentMetadata {
   }
 
   @Override
+  public String getBitmapRangeIndexFileName(String column) {
+    return column + V1Constants.Indexes.BITMAP_RANGE_INDEX_FILE_EXTENSION;
+  }
+
+  @Override
   public String getBloomFilterFileName(String column) {
     return column + V1Constants.Indexes.BLOOM_FILTER_FILE_EXTENSION;
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/RangeIndexReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/RangeIndexReader.java
new file mode 100644
index 0000000..b4d1a77
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/RangeIndexReader.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.readers;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.lang.ref.SoftReference;
+import java.nio.ByteBuffer;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RangeIndexReader implements InvertedIndexReader<ImmutableRoaringBitmap> {
+  public static final Logger LOGGER = LoggerFactory.getLogger(RangeIndexReader.class);
+
+  private final PinotDataBuffer _buffer;
+  private final int _version;
+  private final FieldSpec.DataType _valueType;
+  private final int _numRanges;
+  final long _bitmapIndexOffset;
+  private final Number[] _rangeStartArray;
+  private final Number _lastRangeEnd;
+
+  private volatile SoftReference<SoftReference<ImmutableRoaringBitmap>[]> _bitmaps = null;
+
+  /**
+   * Constructs an inverted index with the specified size.
+   * @param indexDataBuffer data buffer for the inverted index.
+   */
+  public RangeIndexReader(PinotDataBuffer indexDataBuffer) {
+    long offset = 0;
+    _buffer = indexDataBuffer;
+    //READER VERSION
+    _version = _buffer.getInt(offset);
+    offset += Integer.BYTES;
+
+    //READ THE VALUE TYPE (INT, LONG, DOUBLE, FLOAT)
+    int valueTypeBytesLength = _buffer.getInt(offset);
+    offset += Integer.BYTES;
+    byte[] valueTypeBytes = new byte[valueTypeBytesLength];
+    _buffer.copyTo(offset, valueTypeBytes);
+    offset += valueTypeBytesLength;
+    _valueType = FieldSpec.DataType.valueOf(new String(valueTypeBytes));
+
+    //READ THE NUMBER OF RANGES
+    _numRanges = _buffer.getInt(offset);
+    offset += Integer.BYTES;
+    long rangeArrayStartOffset = offset;
+
+    _rangeStartArray = new Number[_numRanges];
+    final long lastOffset = _buffer.getLong(offset + (_numRanges + 1) * _valueType.size() + _numRanges * Long.BYTES);
+
+    _bitmapIndexOffset = offset + (_numRanges + 1) * _valueType.size();
+
+    Preconditions.checkState(lastOffset == _buffer.size(),
+        "The last offset should be equal to buffer size! Current lastOffset: " + lastOffset + ", buffer size: "
+            + _buffer.size());
+    switch (_valueType) {
+      case INT:
+        for (int i = 0; i < _numRanges; i++) {
+          _rangeStartArray[i] = _buffer.getInt(rangeArrayStartOffset + i * Integer.BYTES);
+        }
+        _lastRangeEnd = _buffer.getInt(rangeArrayStartOffset + _numRanges * Integer.BYTES);
+        break;
+      case LONG:
+        for (int i = 0; i < _numRanges; i++) {
+          _rangeStartArray[i] = _buffer.getLong(rangeArrayStartOffset + i * Long.BYTES);
+        }
+        _lastRangeEnd = _buffer.getLong(rangeArrayStartOffset + _numRanges * Long.BYTES);
+        break;
+      case FLOAT:
+        for (int i = 0; i < _numRanges; i++) {
+          _rangeStartArray[i] = _buffer.getFloat(rangeArrayStartOffset + i * Float.BYTES);
+        }
+        _lastRangeEnd = _buffer.getFloat(rangeArrayStartOffset + _numRanges * Float.BYTES);
+        break;
+      case DOUBLE:
+        for (int i = 0; i < _numRanges; i++) {
+          _rangeStartArray[i] = _buffer.getDouble(rangeArrayStartOffset + i * Double.BYTES);
+        }
+        _lastRangeEnd = _buffer.getDouble(rangeArrayStartOffset + _numRanges * Double.BYTES);
+        break;
+      default:
+        throw new RuntimeException("Range Index Unsupported for dataType:" + _valueType);
+    }
+  }
+
+  @VisibleForTesting
+  public Number[] getRangeStartArray() {
+    return _rangeStartArray;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public ImmutableRoaringBitmap getDocIds(int rangeId) {
+    SoftReference<ImmutableRoaringBitmap>[] bitmapArrayReference = null;
+    // Return the bitmap if it's still on heap
+    if (_bitmaps != null) {
+      bitmapArrayReference = _bitmaps.get();
+      if (bitmapArrayReference != null) {
+        SoftReference<ImmutableRoaringBitmap> bitmapReference = bitmapArrayReference[rangeId];
+        if (bitmapReference != null) {
+          ImmutableRoaringBitmap value = bitmapReference.get();
+          if (value != null) {
+            return value;
+          }
+        }
+      } else {
+        bitmapArrayReference = new SoftReference[_numRanges];
+        _bitmaps = new SoftReference<SoftReference<ImmutableRoaringBitmap>[]>(bitmapArrayReference);
+      }
+    } else {
+      bitmapArrayReference = new SoftReference[_numRanges];
+      _bitmaps = new SoftReference<SoftReference<ImmutableRoaringBitmap>[]>(bitmapArrayReference);
+    }
+    synchronized (this) {
+      ImmutableRoaringBitmap value;
+      if (bitmapArrayReference[rangeId] == null || bitmapArrayReference[rangeId].get() == null) {
+        value = buildRoaringBitmapForIndex(rangeId);
+        bitmapArrayReference[rangeId] = new SoftReference<ImmutableRoaringBitmap>(value);
+      } else {
+        value = bitmapArrayReference[rangeId].get();
+      }
+      return value;
+    }
+  }
+
+  @Override
+  public ImmutableRoaringBitmap getDocIds(Object value) {
+    // This should not be called from anywhere. If it happens, there is a bug
+    // and that's why we throw illegal state exception
+    throw new IllegalStateException("bitmap inverted index reader supports lookup only on dictionary id");
+  }
+
+  private synchronized ImmutableRoaringBitmap buildRoaringBitmapForIndex(final int rangeId) {
+    final long currentOffset = getOffset(rangeId);
+    final long nextOffset = getOffset(rangeId + 1);
+    final int bufferLength = (int) (nextOffset - currentOffset);
+
+    // Slice the buffer appropriately for Roaring Bitmap
+    ByteBuffer bb = _buffer.toDirectByteBuffer(currentOffset, bufferLength);
+    return new ImmutableRoaringBitmap(bb);
+  }
+
+  private long getOffset(final int rangeId) {
+    return _buffer.getLong(_bitmapIndexOffset + rangeId * Long.BYTES);
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    _buffer.close();
+  }
+
+  /**
+   * Find the rangeIndex that the value falls in
+   * Note this assumes that the value is in one of the range.
+   * @param value
+   * @return
+   */
+  public int findRangeId(int value) {
+    for (int i = 0; i < _rangeStartArray.length; i++) {
+      if (value < _rangeStartArray[i].intValue()) {
+        return i - 1;
+      }
+    }
+    if (value <= _lastRangeEnd.intValue()) {
+      return _rangeStartArray.length - 1;
+    }
+    return -1;
+  }
+
+  public int findRangeId(long value) {
+    for (int i = 0; i < _rangeStartArray.length; i++) {
+      if (value < _rangeStartArray[i].longValue()) {
+        return i - 1;
+      }
+    }
+    if (value <= _lastRangeEnd.longValue()) {
+      return _rangeStartArray.length - 1;
+    }
+    return -1;
+  }
+
+  public int findRangeId(float value) {
+    for (int i = 0; i < _rangeStartArray.length; i++) {
+      if (value < _rangeStartArray[i].floatValue()) {
+        return i - 1;
+      }
+    }
+    if (value <= _lastRangeEnd.floatValue()) {
+      return _rangeStartArray.length - 1;
+    }
+    return -1;
+  }
+
+  public int findRangeId(double value) {
+    for (int i = 0; i < _rangeStartArray.length; i++) {
+      if (value < _rangeStartArray[i].doubleValue()) {
+        return i - 1;
+      }
+    }
+    if (value <= _lastRangeEnd.doubleValue()) {
+      return _rangeStartArray.length - 1;
+    }
+    return -1;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java
index bdb3b6a..dcd21df 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/ColumnIndexType.java
@@ -24,7 +24,8 @@ public enum ColumnIndexType {
   INVERTED_INDEX("inverted_index"),
   BLOOM_FILTER("bloom_filter"),
   NULLVALUE_VECTOR("nullvalue_vector"),
-  TEXT_INDEX("text_index");
+  TEXT_INDEX("text_index"),
+  RANGE_INDEX("range_index");
 
   private final String indexName;
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java
index d5e8311..2b1328f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/store/FilePerIndexDirectory.java
@@ -123,6 +123,9 @@ class FilePerIndexDirectory extends ColumnIndexDirectory {
       case INVERTED_INDEX:
         filename = metadata.getBitmapInvertedIndexFileName(column);
         break;
+      case RANGE_INDEX:
+        filename = metadata.getBitmapRangeIndexFileName(column);
+        break;
       case BLOOM_FILTER:
         filename = metadata.getBloomFilterFileName(column);
         break;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java
index de2af42..c91500d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/virtualcolumn/VirtualColumnIndexContainer.java
@@ -52,6 +52,11 @@ public class VirtualColumnIndexContainer implements ColumnIndexContainer {
   }
 
   @Override
+  public InvertedIndexReader getRangeIndex() {
+    return null;
+  }
+
+  @Override
   public Dictionary getDictionary() {
     return _dictionary;
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeDataSource.java
index da13528..0e6e077 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeDataSource.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/v2/store/StarTreeDataSource.java
@@ -25,6 +25,7 @@ import org.apache.pinot.core.data.partition.PartitionFunction;
 import org.apache.pinot.core.io.reader.SingleColumnSingleValueReader;
 import org.apache.pinot.core.segment.index.datasource.BaseDataSource;
 import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.apache.pinot.core.segment.index.readers.InvertedIndexReader;
 import org.apache.pinot.spi.data.FieldSpec;
 
 
@@ -33,7 +34,7 @@ public class StarTreeDataSource extends BaseDataSource {
 
   public StarTreeDataSource(FieldSpec fieldSpec, int numDocs, SingleColumnSingleValueReader forwardIndex,
       @Nullable Dictionary dictionary) {
-    super(new StarTreeDataSourceMetadata(fieldSpec, numDocs), forwardIndex, dictionary, null, null, null,
+    super(new StarTreeDataSourceMetadata(fieldSpec, numDocs), forwardIndex, dictionary, null, null,null, null,
         OPERATOR_NAME_PREFIX + fieldSpec.getName());
   }
 
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java
index 9e85b24..a0a5b95 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/RealtimeNoDictionaryTest.java
@@ -110,17 +110,17 @@ public class RealtimeNoDictionaryTest {
 
     Map<String, DataSource> dataSourceBlock = new HashMap<>();
     dataSourceBlock.put(INT_COL_NAME,
-        new MutableDataSource(intSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, intRawIndex, null, null, null, null));
+        new MutableDataSource(intSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, intRawIndex, null, null, null,null, null));
     dataSourceBlock.put(LONG_COL_NAME,
-        new MutableDataSource(longSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, longRawIndex, null, null, null, null));
+        new MutableDataSource(longSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, longRawIndex, null, null, null,null, null));
     dataSourceBlock.put(FLOAT_COL_NAME,
-        new MutableDataSource(floatSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, floatRawIndex, null, null, null, null));
+        new MutableDataSource(floatSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, floatRawIndex, null, null, null,null, null));
     dataSourceBlock.put(DOUBLE_COL_NAME,
-        new MutableDataSource(doubleSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, doubleRawIndex, null, null, null, null));
+        new MutableDataSource(doubleSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, doubleRawIndex, null, null, null,null, null));
     dataSourceBlock.put(STRING_COL_NAME,
-        new MutableDataSource(stringSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, stringRawIndex, null, null, null, null));
+        new MutableDataSource(stringSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, stringRawIndex, null, null, null,null, null));
     dataSourceBlock.put(BYTES_COL_NAME,
-        new MutableDataSource(bytesSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, bytesRawIndex, null, null, null, null));
+        new MutableDataSource(bytesSpec, NUM_ROWS, NUM_ROWS, 0, null, 0, bytesRawIndex, null, null, null,null, null));
 
     return new DataFetcher(dataSourceBlock);
   }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/RangeIndexCreatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/RangeIndexCreatorTest.java
new file mode 100644
index 0000000..0a4c171
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/RangeIndexCreatorTest.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.creator;
+
+import com.google.common.base.Preconditions;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import org.apache.pinot.core.segment.creator.impl.inv.RangeIndexCreator;
+import org.apache.pinot.core.segment.index.readers.RangeIndexReader;
+import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.apache.pinot.core.segment.creator.impl.V1Constants.Indexes.BITMAP_RANGE_INDEX_FILE_EXTENSION;
+
+
+/**
+ * Class for testing Range index.
+ */
+public class RangeIndexCreatorTest {
+
+  @Test
+  public void testInt()
+      throws Exception {
+    File indexDir = new File(System.getProperty("java.io.tmpdir") + "/testRangeIndex");
+    indexDir.mkdirs();
+    FieldSpec fieldSpec = new MetricFieldSpec();
+    fieldSpec.setDataType(FieldSpec.DataType.INT);
+    String columnName = "latency";
+    fieldSpec.setName(columnName);
+    int cardinality = 20;
+    int numDocs = 1000;
+    int numValues = 1000;
+    RangeIndexCreator creator =
+        new RangeIndexCreator(indexDir, fieldSpec, FieldSpec.DataType.INT, -1, -1, numDocs, numValues);
+    Random r = new Random();
+    Number[] values = new Number[numValues];
+    for (int i = 0; i < numDocs; i++) {
+      int val = r.nextInt(cardinality);
+      creator.add(val);
+      values[i] = val;
+    }
+    creator.seal();
+
+    File rangeIndexFile = new File(indexDir, columnName + BITMAP_RANGE_INDEX_FILE_EXTENSION);
+    //TEST THE BUFFER FORMAT
+
+    testRangeIndexBufferFormat(values, rangeIndexFile);
+
+    //TEST USING THE READER
+    PinotDataBuffer pinotDataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(rangeIndexFile);
+    RangeIndexReader rangeIndexReader = new RangeIndexReader(pinotDataBuffer);
+    Number[] rangeStartArray = rangeIndexReader.getRangeStartArray();
+    for (int rangeId = 0; rangeId < rangeStartArray.length; rangeId++) {
+      ImmutableRoaringBitmap bitmap = rangeIndexReader.getDocIds(rangeId);
+      for (int docId : bitmap.toArray()) {
+        checkInt(rangeStartArray, rangeId, values, docId);
+      }
+    }
+  }
+
+  private void checkInt(Number[] rangeStartArray, int rangeId, Number[] values, int docId) {
+    if (rangeId != rangeStartArray.length - 1) {
+      Assert.assertTrue(
+          rangeStartArray[rangeId].intValue() <= values[docId].intValue() && values[docId].intValue() < rangeStartArray[
+              rangeId + 1].intValue(), "rangestart:" + rangeStartArray[rangeId] + " value:" + values[docId]);
+    } else {
+      Assert.assertTrue(rangeStartArray[rangeId].intValue() <= values[docId].intValue(),
+          "rangestart:" + rangeStartArray[rangeId] + " value:" + values[docId]);
+    }
+  }
+
+  private void testRangeIndexBufferFormat(Number[] values, File rangeIndexFile)
+      throws IOException {
+    DataInputStream dis = new DataInputStream(new FileInputStream(rangeIndexFile));
+    int version = dis.readInt();
+    int valueTypeBytesLength = dis.readInt();
+
+    byte[] valueTypeBytes = new byte[valueTypeBytesLength];
+    dis.read(valueTypeBytes);
+    String name = new String(valueTypeBytes);
+    FieldSpec.DataType dataType = FieldSpec.DataType.valueOf(name);
+
+    int numRanges = dis.readInt();
+
+    Number[] rangeStart = new Number[numRanges];
+    Number rangeEnd;
+
+    switch (dataType) {
+      case INT:
+        for (int i = 0; i < numRanges; i++) {
+          rangeStart[i] = dis.readInt();
+        }
+        rangeEnd = dis.readInt();
+        break;
+      case LONG:
+        for (int i = 0; i < numRanges; i++) {
+          rangeStart[i] = dis.readLong();
+        }
+        rangeEnd = dis.readLong();
+        break;
+      case FLOAT:
+        for (int i = 0; i < numRanges; i++) {
+          rangeStart[i] = dis.readFloat();
+        }
+        rangeEnd = dis.readFloat();
+        break;
+      case DOUBLE:
+        for (int i = 0; i < numRanges; i++) {
+          rangeStart[i] = dis.readDouble();
+        }
+        rangeEnd = dis.readDouble();
+        break;
+    }
+
+    long[] rangeBitmapOffsets = new long[numRanges + 1];
+    for (int i = 0; i <= numRanges; i++) {
+      rangeBitmapOffsets[i] = dis.readLong();
+    }
+    ImmutableRoaringBitmap[] bitmaps = new ImmutableRoaringBitmap[numRanges];
+    for (int i = 0; i < numRanges; i++) {
+      long serializedBitmapLength;
+      serializedBitmapLength = rangeBitmapOffsets[i + 1] - rangeBitmapOffsets[i];
+      byte[] bytes = new byte[(int) serializedBitmapLength];
+      dis.read(bytes, 0, (int) serializedBitmapLength);
+      bitmaps[i] = new ImmutableRoaringBitmap(ByteBuffer.wrap(bytes));
+      for (int docId : bitmaps[i].toArray()) {
+        if (i != numRanges - 1) {
+          Assert.assertTrue(
+              rangeStart[i].intValue() <= values[docId].intValue() && values[docId].intValue() < rangeStart[i + 1]
+                  .intValue(), "rangestart:" + rangeStart[i] + " value:" + values[docId]);
+        } else {
+          Assert.assertTrue(rangeStart[i].intValue() <= values[docId].intValue());
+        }
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index ced3014..4e74c42 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -71,6 +71,7 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
   private static final List<String> DEFAULT_RAW_INDEX_COLUMNS =
       Arrays.asList("ActualElapsedTime", "ArrDelay", "DepDelay", "CRSDepTime");
   private static final List<String> DEFAULT_BLOOM_FILTER_COLUMNS = Arrays.asList("FlightNum", "Origin");
+  private static final List<String> DEFAULT_RANGE_INDEX_COLUMNS = Arrays.asList("", "Origin");
 
   protected final File _tempDir = new File(FileUtils.getTempDirectory(), getClass().getSimpleName());
   protected final File _avroDir = new File(_tempDir, "avroDir");
@@ -172,6 +173,11 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
   }
 
   @Nullable
+  protected List<String> getRangeIndexColumns() {
+    return DEFAULT_RANGE_INDEX_COLUMNS;
+  }
+
+  @Nullable
   protected List<String> getRawIndexColumns() {
     return DEFAULT_RAW_INDEX_COLUMNS;
   }
@@ -420,7 +426,7 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
     addRealtimeTable(tableName, useLLC, KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR,
         getKafkaTopic(), getRealtimeSegmentFlushSize(), avroFile, timeColumnName, timeType, schemaName,
         getBrokerTenant(), getServerTenant(), getLoadMode(), getSortedColumn(), getInvertedIndexColumns(),
-        getBloomFilterIndexColumns(), getRawIndexColumns(), getTaskConfig(), getStreamConsumerFactoryClassName(),
+        getBloomFilterIndexColumns(), getRangeIndexColumns(), getRawIndexColumns(), getTaskConfig(), getStreamConsumerFactoryClassName(),
         numReplicas);
   }
 
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index 3c0ab89..19fe79c 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -293,40 +293,43 @@ public abstract class ClusterTest extends ControllerTest {
 
   protected void addOfflineTable(String tableName, SegmentVersion segmentVersion)
       throws Exception {
-    addOfflineTable(tableName, null, null, null, null, null, segmentVersion, null, null, null, null, null);
+    addOfflineTable(tableName, null, null, null, null, null, segmentVersion, null, null, null, null, null, null);
   }
 
   protected void addOfflineTable(String tableName, String timeColumnName, String timeType, String brokerTenant,
       String serverTenant, String loadMode, SegmentVersion segmentVersion, List<String> invertedIndexColumns,
-      List<String> bloomFilterColumns, TableTaskConfig taskConfig, SegmentPartitionConfig segmentPartitionConfig,
-      String sortedColumn)
+      List<String> bloomFilterColumns, List<String> rangeIndexColumns, TableTaskConfig taskConfig,
+      SegmentPartitionConfig segmentPartitionConfig, String sortedColumn)
       throws Exception {
     TableConfig tableConfig =
         getOfflineTableConfig(tableName, timeColumnName, timeType, brokerTenant, serverTenant, loadMode, segmentVersion,
-            invertedIndexColumns, bloomFilterColumns, taskConfig, segmentPartitionConfig, sortedColumn, "daily");
+            invertedIndexColumns, bloomFilterColumns, rangeIndexColumns, taskConfig, segmentPartitionConfig,
+            sortedColumn, "daily");
     sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJsonString());
   }
 
   protected void updateOfflineTable(String tableName, String timeColumnName, String timeType, String brokerTenant,
       String serverTenant, String loadMode, SegmentVersion segmentVersion, List<String> invertedIndexColumns,
-      List<String> bloomFilterColumns, TableTaskConfig taskConfig, SegmentPartitionConfig segmentPartitionConfig,
-      String sortedColumn)
+      List<String> bloomFilterColumns, List<String> rangeIndexColumns, TableTaskConfig taskConfig,
+      SegmentPartitionConfig segmentPartitionConfig, String sortedColumn)
       throws Exception {
     TableConfig tableConfig =
         getOfflineTableConfig(tableName, timeColumnName, timeType, brokerTenant, serverTenant, loadMode, segmentVersion,
-            invertedIndexColumns, bloomFilterColumns, taskConfig, segmentPartitionConfig, sortedColumn, "daily");
+            invertedIndexColumns, bloomFilterColumns, rangeIndexColumns, taskConfig, segmentPartitionConfig,
+            sortedColumn, "daily");
     sendPutRequest(_controllerRequestURLBuilder.forUpdateTableConfig(tableName), tableConfig.toJsonString());
   }
 
   private static TableConfig getOfflineTableConfig(String tableName, String timeColumnName, String timeType,
       String brokerTenant, String serverTenant, String loadMode, SegmentVersion segmentVersion,
-      List<String> invertedIndexColumns, List<String> bloomFilterColumns, TableTaskConfig taskConfig,
-      SegmentPartitionConfig segmentPartitionConfig, String sortedColumn, String segmentPushFrequency) {
+      List<String> invertedIndexColumns, List<String> bloomFilterColumns, List<String> rangeIndexColumns,
+      TableTaskConfig taskConfig, SegmentPartitionConfig segmentPartitionConfig, String sortedColumn,
+      String segmentPushFrequency) {
     return new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).setTimeColumnName(timeColumnName)
         .setTimeType(timeType).setSegmentPushFrequency(segmentPushFrequency).setNumReplicas(1)
         .setBrokerTenant(brokerTenant).setServerTenant(serverTenant).setLoadMode(loadMode)
         .setSegmentVersion(segmentVersion.toString()).setInvertedIndexColumns(invertedIndexColumns)
-        .setBloomFilterColumns(bloomFilterColumns).setTaskConfig(taskConfig)
+        .setBloomFilterColumns(bloomFilterColumns).setRangeIndexColumns(rangeIndexColumns).setTaskConfig(taskConfig)
         .setSegmentPartitionConfig(segmentPartitionConfig).setSortedColumn(sortedColumn).build();
   }
 
@@ -377,19 +380,19 @@ public abstract class ClusterTest extends ControllerTest {
   protected void addRealtimeTable(String tableName, boolean useLlc, String kafkaBrokerList, String kafkaZkUrl,
       String kafkaTopic, int realtimeSegmentFlushRows, File avroFile, String timeColumnName, String timeType,
       String schemaName, String brokerTenant, String serverTenant, String loadMode, String sortedColumn,
-      List<String> invertedIndexColumns, List<String> bloomFilterColumns, List<String> noDictionaryColumns,
-      TableTaskConfig taskConfig, String streamConsumerFactoryName)
+      List<String> invertedIndexColumns, List<String> bloomFilterColumns, List<String> rangeIndexColumns,
+      List<String> noDictionaryColumns, TableTaskConfig taskConfig, String streamConsumerFactoryName)
       throws Exception {
     addRealtimeTable(tableName, useLlc, kafkaBrokerList, kafkaZkUrl, kafkaTopic, realtimeSegmentFlushRows, avroFile,
         timeColumnName, timeType, schemaName, brokerTenant, serverTenant, loadMode, sortedColumn, invertedIndexColumns,
-        bloomFilterColumns, noDictionaryColumns, taskConfig, streamConsumerFactoryName, 1);
+        bloomFilterColumns, rangeIndexColumns, noDictionaryColumns, taskConfig, streamConsumerFactoryName, 1);
   }
 
   protected void addRealtimeTable(String tableName, boolean useLlc, String kafkaBrokerList, String kafkaZkUrl,
       String kafkaTopic, int realtimeSegmentFlushRows, File avroFile, String timeColumnName, String timeType,
       String schemaName, String brokerTenant, String serverTenant, String loadMode, String sortedColumn,
-      List<String> invertedIndexColumns, List<String> bloomFilterColumns, List<String> noDictionaryColumns,
-      TableTaskConfig taskConfig, String streamConsumerFactoryName, int numReplicas)
+      List<String> invertedIndexColumns, List<String> bloomFilterColumns, List<String> rangeIndexColumns,
+      List<String> noDictionaryColumns, TableTaskConfig taskConfig, String streamConsumerFactoryName, int numReplicas)
       throws Exception {
     addRealtimeTable(tableName, useLlc, kafkaBrokerList, kafkaZkUrl, kafkaTopic, realtimeSegmentFlushRows, avroFile,
         timeColumnName, timeType, schemaName, brokerTenant, serverTenant, loadMode, sortedColumn, invertedIndexColumns,
@@ -480,14 +483,15 @@ public abstract class ClusterTest extends ControllerTest {
   protected void addHybridTable(String tableName, boolean useLlc, String kafkaBrokerList, String kafkaZkUrl,
       String kafkaTopic, int realtimeSegmentFlushSize, File avroFile, String timeColumnName, String timeType,
       String schemaName, String brokerTenant, String serverTenant, String loadMode, String sortedColumn,
-      List<String> invertedIndexColumns, List<String> bloomFilterColumns, List<String> noDictionaryColumns,
-      TableTaskConfig taskConfig, String streamConsumerFactoryName, SegmentPartitionConfig segmentPartitionConfig)
+      List<String> invertedIndexColumns, List<String> bloomFilterColumns, List<String> rangeIndexColumns,
+      List<String> noDictionaryColumns, TableTaskConfig taskConfig, String streamConsumerFactoryName,
+      SegmentPartitionConfig segmentPartitionConfig)
       throws Exception {
     addOfflineTable(tableName, timeColumnName, timeType, brokerTenant, serverTenant, loadMode, SegmentVersion.v1,
-        invertedIndexColumns, bloomFilterColumns, taskConfig, segmentPartitionConfig, sortedColumn);
+        invertedIndexColumns, bloomFilterColumns, rangeIndexColumns, taskConfig, segmentPartitionConfig, sortedColumn);
     addRealtimeTable(tableName, useLlc, kafkaBrokerList, kafkaZkUrl, kafkaTopic, realtimeSegmentFlushSize, avroFile,
         timeColumnName, timeType, schemaName, brokerTenant, serverTenant, loadMode, sortedColumn, invertedIndexColumns,
-        bloomFilterColumns, noDictionaryColumns, taskConfig, streamConsumerFactoryName);
+        bloomFilterColumns, rangeIndexColumns, noDictionaryColumns, taskConfig, streamConsumerFactoryName);
   }
 
   protected JsonNode getDebugInfo(final String uri)
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
index bb343f5..6050314 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
@@ -169,7 +169,8 @@ public class ControllerPeriodicTasksIntegrationTests extends BaseClusterIntegrat
    */
   private void setupOfflineTable(String table)
       throws Exception {
-    addOfflineTable(table, null, null, TENANT_NAME, TENANT_NAME, null, SegmentVersion.v1, null, null, null, null, null);
+    addOfflineTable(table, null, null, TENANT_NAME, TENANT_NAME, null, SegmentVersion.v1, null, null, null, null, null,
+        null);
   }
 
   /**
@@ -194,7 +195,7 @@ public class ControllerPeriodicTasksIntegrationTests extends BaseClusterIntegrat
     String timeType = outgoingTimeUnit.toString();
 
     addOfflineTable(tableName, timeColumnName, timeType, TENANT_NAME, TENANT_NAME, null, SegmentVersion.v1, null, null,
-        null, null, null);
+        null, null, null, null);
 
     ExecutorService executor = Executors.newCachedThreadPool();
     ClusterIntegrationTestUtils
@@ -225,8 +226,8 @@ public class ControllerPeriodicTasksIntegrationTests extends BaseClusterIntegrat
 
     addRealtimeTable(table, useLlc(), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR, topic,
         getRealtimeSegmentFlushSize(), avroFile, timeColumnName, timeType, schemaName, TENANT_NAME, TENANT_NAME,
-        getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getBloomFilterIndexColumns(), getRawIndexColumns(),
-        getTaskConfig(), getStreamConsumerFactoryClassName());
+        getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getBloomFilterIndexColumns(),
+        getRangeIndexColumns(), getRawIndexColumns(), getTaskConfig(), getStreamConsumerFactoryClassName());
   }
 
   @Override
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
index aae6005..e9e8e47 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
@@ -139,7 +139,7 @@ public class HybridClusterIntegrationTest extends BaseClusterIntegrationTestSet
 
     addHybridTable(getTableName(), useLlc(), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR,
         getKafkaTopic(), getRealtimeSegmentFlushSize(), avroFile, timeColumnName, timeType, schemaName, TENANT_NAME,
-        TENANT_NAME, getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getBloomFilterIndexColumns(),
+        TENANT_NAME, getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getBloomFilterIndexColumns(), getRangeIndexColumns(),
         getRawIndexColumns(), getTaskConfig(), getStreamConsumerFactoryClassName(), getSegmentPartitionConfig());
   }
 
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
index dba6e22..8935966 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
@@ -300,7 +300,7 @@ public class HybridClusterIntegrationTestCommandLineRunner {
       String timeType = outgoingTimeUnit.toString();
       addHybridTable(_tableName, _useLlc, KAFKA_BROKER, KAFKA_ZK_STR, getKafkaTopic(), getRealtimeSegmentFlushSize(),
           _realtimeAvroFiles.get(0), timeColumnName, timeType, schemaName, TENANT_NAME, TENANT_NAME, "MMAP",
-          _sortedColumn, _invertedIndexColumns, null, null, null, getStreamConsumerFactoryClassName(), null);
+          _sortedColumn, _invertedIndexColumns, null, null, null, null, getStreamConsumerFactoryClassName(), null);
 
       // Upload all segments
       uploadSegments(getTableName(), _tarDir);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 5ccc832..db3bd93 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -72,6 +72,11 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
   private static final String TEST_UPDATED_INVERTED_INDEX_QUERY =
       "SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = 305";
 
+  // For inverted index triggering test
+  private static final List<String> UPDATED_RANGE_INDEX_COLUMNS = Arrays.asList("DivActualElapsedTime");
+  private static final String TEST_UPDATED_RANGE_INDEX_QUERY =
+      "SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime > 305";
+
   private static final List<String> UPDATED_BLOOM_FILTER_COLUMNS = Collections.singletonList("Carrier");
   private static final String TEST_UPDATED_BLOOM_FILTER_QUERY = "SELECT COUNT(*) FROM mytable WHERE Carrier = 'CA'";
 
@@ -127,7 +132,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
 
     // Create the table
     addOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1, getInvertedIndexColumns(),
-        getBloomFilterIndexColumns(), getTaskConfig(), null, null);
+        getBloomFilterIndexColumns(), getRangeIndexColumns(), getTaskConfig(), null, null);
 
     // Upload all segments
     uploadSegments(getTableName(), _tarDir);
@@ -282,7 +287,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
 
     // Update table config and trigger reload
     updateOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1,
-        UPDATED_INVERTED_INDEX_COLUMNS, null, getTaskConfig(), null, null);
+        UPDATED_INVERTED_INDEX_COLUMNS, null, null, getTaskConfig(), null, null);
 
     sendPostRequest(_controllerBaseApiUrl + "/tables/mytable/segments/reload?type=offline", null);
 
@@ -307,7 +312,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
 
     // Update table config and trigger reload
     updateOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1, null,
-        UPDATED_BLOOM_FILTER_COLUMNS, getTaskConfig(), null, null);
+        UPDATED_BLOOM_FILTER_COLUMNS, null, getTaskConfig(), null, null);
 
     sendPostRequest(_controllerBaseApiUrl + "/tables/mytable/segments/reload?type=offline", null);
 
@@ -320,6 +325,36 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
+    }, 600_000L, "Failed to generate bloom index");
+  }
+
+  @Test
+  public void testRangeIndexTriggering()
+      throws Exception {
+    final long numTotalDocs = getCountStarResult();
+    JsonNode queryResponse = postQuery(TEST_UPDATED_RANGE_INDEX_QUERY);
+    System.out.println("Before queryResponse = " + queryResponse);
+    assertEquals(queryResponse.get("numEntriesScannedInFilter").asLong(), numTotalDocs);
+    long beforeCount = queryResponse.get("aggregationResults").get(0).get("value").asLong();
+
+    // Update table config and trigger reload
+    updateOfflineTable(getTableName(), null, null, null, null, getLoadMode(), SegmentVersion.v1, null, null,
+        UPDATED_RANGE_INDEX_COLUMNS, getTaskConfig(), null, null);
+
+    sendPostRequest(_controllerBaseApiUrl + "/tables/mytable/segments/reload?type=offline", null);
+
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        JsonNode queryResponse1 = postQuery(TEST_UPDATED_RANGE_INDEX_QUERY);
+        System.out.println("After queryResponse = " + queryResponse);
+        assertEquals(queryResponse1.get("totalDocs").asLong(), numTotalDocs);
+        long afterCount = queryResponse.get("aggregationResults").get(0).get("value").asLong();
+        //we should be scanning less than numTotalDocs with index enabled.
+        //In the current implementation its 8785, but it
+        return beforeCount == afterCount  && queryResponse1.get("numEntriesScannedInFilter").asLong() < numTotalDocs;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
     }, 600_000L, "Failed to generate inverted index");
   }
 
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index 6c849dc..3416164 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -84,9 +84,11 @@ public class SimpleMinionClusterIntegrationTest extends ClusterTest {
     // Add 3 offline tables, where 2 of them have TestTask enabled
     TableTaskConfig taskConfig =
         new TableTaskConfig(Collections.singletonMap(TestTaskGenerator.TASK_TYPE, Collections.emptyMap()));
-    addOfflineTable(TABLE_NAME_1, null, null, null, null, null, SegmentVersion.v1, null, null, taskConfig, null, null);
-    addOfflineTable(TABLE_NAME_2, null, null, null, null, null, SegmentVersion.v1, null, null, taskConfig, null, null);
-    addOfflineTable(TABLE_NAME_3, null, null, null, null, null, SegmentVersion.v1, null, null, null, null, null);
+    addOfflineTable(TABLE_NAME_1, null, null, null, null, null, SegmentVersion.v1, null, null, null, taskConfig, null,
+        null);
+    addOfflineTable(TABLE_NAME_2, null, null, null, null, null, SegmentVersion.v1, null, null, null, taskConfig, null,
+        null);
+    addOfflineTable(TABLE_NAME_3, null, null, null, null, null, SegmentVersion.v1, null, null, null, null, null, null);
 
     _helixTaskResourceManager = _controllerStarter.getHelixTaskResourceManager();
     _taskManager = _controllerStarter.getTaskManager();
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
index 162c520..b2e0ce2 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/IndexingConfig.java
@@ -26,6 +26,7 @@ import org.apache.pinot.spi.config.BaseJsonConfig;
 
 public class IndexingConfig extends BaseJsonConfig {
   private List<String> _invertedIndexColumns;
+  private List<String> _rangeIndexColumns;
   private boolean _autoGeneratedInvertedIndex;
   private boolean _createInvertedIndexDuringSegmentGeneration;
   private List<String> _sortedColumn;
@@ -59,6 +60,14 @@ public class IndexingConfig extends BaseJsonConfig {
     _invertedIndexColumns = invertedIndexColumns;
   }
 
+  public List<String> getRangeIndexColumns() {
+    return _rangeIndexColumns;
+  }
+
+  public void setRangeIndexColumns(List<String> rangeIndexColumns) {
+    _rangeIndexColumns = rangeIndexColumns;
+  }
+
   public boolean isAutoGeneratedInvertedIndex() {
     return _autoGeneratedInvertedIndex;
   }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
index f86c807..63cc43c 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
@@ -81,6 +81,7 @@ public class TableConfigBuilder {
   private List<String> _noDictionaryColumns;
   private List<String> _onHeapDictionaryColumns;
   private List<String> _bloomFilterColumns;
+  private List<String> _rangeIndexColumns;
   private Map<String, String> _streamConfigs;
   private SegmentPartitionConfig _segmentPartitionConfig;
 
@@ -229,6 +230,11 @@ public class TableConfigBuilder {
     return this;
   }
 
+  public TableConfigBuilder setRangeIndexColumns(List<String> rangeIndexColumns) {
+    _rangeIndexColumns = rangeIndexColumns;
+    return this;
+  }
+
   public TableConfigBuilder setStreamConfigs(Map<String, String> streamConfigs) {
     Preconditions.checkState(_tableType == TableType.REALTIME);
     _streamConfigs = streamConfigs;
@@ -314,6 +320,7 @@ public class TableConfigBuilder {
     indexingConfig.setNoDictionaryColumns(_noDictionaryColumns);
     indexingConfig.setOnHeapDictionaryColumns(_onHeapDictionaryColumns);
     indexingConfig.setBloomFilterColumns(_bloomFilterColumns);
+    indexingConfig.setRangeIndexColumns(_rangeIndexColumns);
     indexingConfig.setStreamConfigs(_streamConfigs);
     indexingConfig.setSegmentPartitionConfig(_segmentPartitionConfig);
 
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/RangePredicateFilter.java b/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/RangePredicateFilter.java
index f93ee87..226aef4 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/RangePredicateFilter.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/scan/query/RangePredicateFilter.java
@@ -80,4 +80,12 @@ public class RangePredicateFilter implements PredicateFilter {
     }
     return false;
   }
+
+  public int getStartIndex() {
+    return _startIndex;
+  }
+
+  public int getEndIndex() {
+    return _endIndex;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org