You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/09/17 02:20:52 UTC

[pinot] branch master updated: refactor RangeIndexReader to make it easier to evolve its implementation (#7435)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new dc78c79  refactor RangeIndexReader to make it easier to evolve its implementation (#7435)
dc78c79 is described below

commit dc78c79076855ea0b51f0a9c15cea7e4a9a2ec33
Author: Richard Startin <ri...@startree.ai>
AuthorDate: Fri Sep 17 03:20:37 2021 +0100

    refactor RangeIndexReader to make it easier to evolve its implementation (#7435)
---
 .../filter/RangeIndexBasedFilterOperator.java      | 106 ++---
 .../indexsegment/mutable/MutableSegmentImpl.java   |   6 +-
 .../creator/impl/inv/RangeIndexCreator.java        |   6 +-
 .../index/column/PhysicalColumnIndexContainer.java |   9 +-
 .../segment/index/datasource/BaseDataSource.java   |   7 +-
 .../index/datasource/MutableDataSource.java        |   3 +-
 ...eIndexReader.java => RangeIndexReaderImpl.java} | 159 ++++++--
 .../virtualcolumn/VirtualColumnIndexContainer.java |   3 +-
 .../index/creator/RangeIndexCreatorTest.java       | 429 +++++++++++++++------
 .../pinot/segment/spi/datasource/DataSource.java   |   3 +-
 .../spi/index/column/ColumnIndexContainer.java     |   3 +-
 .../segment/spi/index/reader/RangeIndexReader.java | 120 ++++++
 12 files changed, 639 insertions(+), 215 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RangeIndexBasedFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RangeIndexBasedFilterOperator.java
index 058d96c..008d9e3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RangeIndexBasedFilterOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RangeIndexBasedFilterOperator.java
@@ -28,8 +28,8 @@ import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFa
 import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFactory.IntRawValueBasedRangePredicateEvaluator;
 import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFactory.LongRawValueBasedRangePredicateEvaluator;
 import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFactory.SortedDictionaryBasedRangePredicateEvaluator;
-import org.apache.pinot.segment.local.segment.index.readers.RangeIndexReader;
 import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.index.reader.RangeIndexReader;
 import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
@@ -51,74 +51,78 @@ public class RangeIndexBasedFilterOperator extends BaseFilterOperator {
 
   @Override
   protected FilterBlock getNextBlock() {
-    RangeIndexReader rangeIndexReader = (RangeIndexReader) _dataSource.getRangeIndex();
+    @SuppressWarnings("unchecked")
+    RangeIndexReader<ImmutableRoaringBitmap> rangeIndexReader =
+        (RangeIndexReader<ImmutableRoaringBitmap>) _dataSource.getRangeIndex();
     assert rangeIndexReader != null;
 
+    ImmutableRoaringBitmap matches;
+    // if the implementation cannot match the entire query exactly, it will
+    // yield partial matches, which need to be verified by scanning. If it
+    // can answer the query exactly, this will be null.
+    ImmutableRoaringBitmap partialMatches;
     int firstRangeId;
     int lastRangeId;
     if (_rangePredicateEvaluator instanceof SortedDictionaryBasedRangePredicateEvaluator) {
-      firstRangeId = rangeIndexReader
-          .findRangeId(((SortedDictionaryBasedRangePredicateEvaluator) _rangePredicateEvaluator).getStartDictId());
       // NOTE: End dictionary id is exclusive in OfflineDictionaryBasedRangePredicateEvaluator.
-      lastRangeId = rangeIndexReader
-          .findRangeId(((SortedDictionaryBasedRangePredicateEvaluator) _rangePredicateEvaluator).getEndDictId() - 1);
+      int startDictId = ((SortedDictionaryBasedRangePredicateEvaluator) _rangePredicateEvaluator).getStartDictId();
+      int endDictId = ((SortedDictionaryBasedRangePredicateEvaluator) _rangePredicateEvaluator).getEndDictId() - 1;
+      matches = rangeIndexReader.getMatchingDocIds(startDictId, endDictId);
+      partialMatches = rangeIndexReader.getPartiallyMatchingDocIds(startDictId, endDictId);
     } else {
       switch (_rangePredicateEvaluator.getDataType()) {
-        case INT:
-          firstRangeId = rangeIndexReader
-              .findRangeId(((IntRawValueBasedRangePredicateEvaluator) _rangePredicateEvaluator).geLowerBound());
-          lastRangeId = rangeIndexReader
-              .findRangeId(((IntRawValueBasedRangePredicateEvaluator) _rangePredicateEvaluator).getUpperBound());
+        case INT: {
+          int lowerBound = ((IntRawValueBasedRangePredicateEvaluator) _rangePredicateEvaluator).geLowerBound();
+          int upperBound = ((IntRawValueBasedRangePredicateEvaluator) _rangePredicateEvaluator).getUpperBound();
+          matches = rangeIndexReader.getMatchingDocIds(lowerBound, upperBound);
+          partialMatches = rangeIndexReader.getPartiallyMatchingDocIds(lowerBound, upperBound);
           break;
-        case LONG:
-          firstRangeId = rangeIndexReader
-              .findRangeId(((LongRawValueBasedRangePredicateEvaluator) _rangePredicateEvaluator).geLowerBound());
-          lastRangeId = rangeIndexReader
-              .findRangeId(((LongRawValueBasedRangePredicateEvaluator) _rangePredicateEvaluator).getUpperBound());
+        }
+        case LONG: {
+          long lowerBound = ((LongRawValueBasedRangePredicateEvaluator) _rangePredicateEvaluator).geLowerBound();
+          long upperBound = ((LongRawValueBasedRangePredicateEvaluator) _rangePredicateEvaluator).getUpperBound();
+          matches = rangeIndexReader.getMatchingDocIds(lowerBound, upperBound);
+          partialMatches = rangeIndexReader.getPartiallyMatchingDocIds(lowerBound, upperBound);
           break;
-        case FLOAT:
-          firstRangeId = rangeIndexReader
-              .findRangeId(((FloatRawValueBasedRangePredicateEvaluator) _rangePredicateEvaluator).geLowerBound());
-          lastRangeId = rangeIndexReader
-              .findRangeId(((FloatRawValueBasedRangePredicateEvaluator) _rangePredicateEvaluator).getUpperBound());
+        }
+        case FLOAT: {
+          float lowerBound = ((FloatRawValueBasedRangePredicateEvaluator) _rangePredicateEvaluator).geLowerBound();
+          float upperBound = ((FloatRawValueBasedRangePredicateEvaluator) _rangePredicateEvaluator).getUpperBound();
+          matches = rangeIndexReader.getMatchingDocIds(lowerBound, upperBound);
+          partialMatches = rangeIndexReader.getPartiallyMatchingDocIds(lowerBound, upperBound);
           break;
-        case DOUBLE:
-          firstRangeId = rangeIndexReader
-              .findRangeId(((DoubleRawValueBasedRangePredicateEvaluator) _rangePredicateEvaluator).geLowerBound());
-          lastRangeId = rangeIndexReader
-              .findRangeId(((DoubleRawValueBasedRangePredicateEvaluator) _rangePredicateEvaluator).getUpperBound());
+        }
+        case DOUBLE: {
+          double lowerBound = ((DoubleRawValueBasedRangePredicateEvaluator) _rangePredicateEvaluator).geLowerBound();
+          double upperBound = ((DoubleRawValueBasedRangePredicateEvaluator) _rangePredicateEvaluator).getUpperBound();
+          matches = rangeIndexReader.getMatchingDocIds(lowerBound, upperBound);
+          partialMatches = rangeIndexReader.getPartiallyMatchingDocIds(lowerBound, upperBound);
           break;
+        }
         default:
           throw new IllegalStateException("String and Bytes data type not supported for Range Indexing");
       }
     }
-
-    // Need to scan the first and last range as they might be partially matched
-    // TODO: Detect fully matched first and last range
-    ImmutableRoaringBitmap docIdsToScan;
-    if (firstRangeId == lastRangeId) {
-      docIdsToScan = rangeIndexReader.getDocIds(firstRangeId);
+    // this branch is likely until RangeIndexReader reimplemented and enabled by default
+    if (partialMatches != null) {
+      // Need to scan the first and last range as they might be partially matched
+      ScanBasedFilterOperator scanBasedFilterOperator =
+          new ScanBasedFilterOperator(_rangePredicateEvaluator, _dataSource, _numDocs);
+      FilterBlockDocIdSet scanBasedDocIdSet = scanBasedFilterOperator.getNextBlock().getBlockDocIdSet();
+      MutableRoaringBitmap docIds = ((ScanBasedDocIdIterator) scanBasedDocIdSet.iterator()).applyAnd(partialMatches);
+      if (matches != null) {
+        docIds.or(matches);
+      }
+      return new FilterBlock(new BitmapDocIdSet(docIds, _numDocs) {
+        // Override this method to reflect the entries scanned
+        @Override
+        public long getNumEntriesScannedInFilter() {
+          return scanBasedDocIdSet.getNumEntriesScannedInFilter();
+        }
+      });
     } else {
-      docIdsToScan =
-          ImmutableRoaringBitmap.or(rangeIndexReader.getDocIds(firstRangeId), rangeIndexReader.getDocIds(lastRangeId));
-    }
-    ScanBasedFilterOperator scanBasedFilterOperator =
-        new ScanBasedFilterOperator(_rangePredicateEvaluator, _dataSource, _numDocs);
-    FilterBlockDocIdSet scanBasedDocIdSet = scanBasedFilterOperator.getNextBlock().getBlockDocIdSet();
-    MutableRoaringBitmap docIds = ((ScanBasedDocIdIterator) scanBasedDocIdSet.iterator()).applyAnd(docIdsToScan);
-
-    // Ranges in the middle of first and last range are fully matched
-    for (int rangeId = firstRangeId + 1; rangeId < lastRangeId; rangeId++) {
-      docIds.or(rangeIndexReader.getDocIds(rangeId));
+      return new FilterBlock(new BitmapDocIdSet(matches == null ? new MutableRoaringBitmap() : matches, _numDocs));
     }
-    return new FilterBlock(new BitmapDocIdSet(docIds, _numDocs) {
-
-      // Override this method to reflect the entries scanned
-      @Override
-      public long getNumEntriesScannedInFilter() {
-        return scanBasedDocIdSet.getNumEntriesScannedInFilter();
-      }
-    });
   }
 
   @Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index d276612..b71af6a 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -69,9 +69,9 @@ import org.apache.pinot.segment.spi.index.ThreadSafeMutableRoaringBitmap;
 import org.apache.pinot.segment.spi.index.creator.H3IndexConfig;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.index.reader.BloomFilterReader;
-import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
 import org.apache.pinot.segment.spi.index.reader.MutableDictionary;
 import org.apache.pinot.segment.spi.index.reader.MutableForwardIndex;
+import org.apache.pinot.segment.spi.index.reader.RangeIndexReader;
 import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
@@ -1087,7 +1087,7 @@ public class MutableSegmentImpl implements MutableSegment {
     final MutableForwardIndex _forwardIndex;
     final MutableDictionary _dictionary;
     final RealtimeInvertedIndexReader _invertedIndex;
-    final InvertedIndexReader _rangeIndex;
+    final RangeIndexReader _rangeIndex;
     final MutableH3Index _h3Index;
     final RealtimeLuceneTextIndexReader _textIndex;
     final boolean _enableFST;
@@ -1105,7 +1105,7 @@ public class MutableSegmentImpl implements MutableSegment {
     IndexContainer(FieldSpec fieldSpec, @Nullable PartitionFunction partitionFunction,
         @Nullable Set<Integer> partitions, NumValuesInfo numValuesInfo, MutableForwardIndex forwardIndex,
         @Nullable MutableDictionary dictionary, @Nullable RealtimeInvertedIndexReader invertedIndex,
-        @Nullable InvertedIndexReader rangeIndex, @Nullable RealtimeLuceneTextIndexReader textIndex, boolean enableFST,
+        @Nullable RangeIndexReader rangeIndex, @Nullable RealtimeLuceneTextIndexReader textIndex, boolean enableFST,
         @Nullable MutableJsonIndex jsonIndex, @Nullable MutableH3Index h3Index, @Nullable BloomFilterReader bloomFilter,
         @Nullable MutableNullValueVector nullValueVector) {
       _fieldSpec = fieldSpec;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/RangeIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/RangeIndexCreator.java
index dd6e03d..5e49c61 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/RangeIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/RangeIndexCreator.java
@@ -443,6 +443,10 @@ public final class RangeIndexCreator implements DictionaryBasedInvertedIndexCrea
     }
   }
 
+  public int getNumValuesPerRange() {
+    return _numValuesPerRange;
+  }
+
   void dump() {
     StringBuilder docIdAsString = new StringBuilder("DocIdBuffer  [ ");
     for (int i = 0; i < _numValues; i++) {
@@ -539,7 +543,7 @@ public final class RangeIndexCreator implements DictionaryBasedInvertedIndexCrea
 
     @Override
     public void put(int position, Number value) {
-      _dataBuffer.putFloat(position << 2, value.intValue());
+      _dataBuffer.putFloat(position << 2, value.floatValue());
     }
 
     @Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
index d6f3ba7..20f2064 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java
@@ -37,7 +37,7 @@ import org.apache.pinot.segment.local.segment.index.readers.OnHeapFloatDictionar
 import org.apache.pinot.segment.local.segment.index.readers.OnHeapIntDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.OnHeapLongDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.OnHeapStringDictionary;
-import org.apache.pinot.segment.local.segment.index.readers.RangeIndexReader;
+import org.apache.pinot.segment.local.segment.index.readers.RangeIndexReaderImpl;
 import org.apache.pinot.segment.local.segment.index.readers.StringDictionary;
 import org.apache.pinot.segment.local.segment.index.readers.bloom.BloomFilterReaderFactory;
 import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVForwardIndexReader;
@@ -56,6 +56,7 @@ import org.apache.pinot.segment.spi.index.reader.H3IndexReader;
 import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
 import org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
 import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
+import org.apache.pinot.segment.spi.index.reader.RangeIndexReader;
 import org.apache.pinot.segment.spi.index.reader.SortedIndexReader;
 import org.apache.pinot.segment.spi.index.reader.TextIndexReader;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
@@ -73,7 +74,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
 
   private final ForwardIndexReader<?> _forwardIndex;
   private final InvertedIndexReader<?> _invertedIndex;
-  private final InvertedIndexReader<?> _rangeIndex;
+  private final RangeIndexReader<?> _rangeIndex;
   private final TextIndexReader _textIndex;
   private final TextIndexReader _fstIndex;
   private final JsonIndexReader _jsonIndex;
@@ -174,7 +175,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
       }
 
       if (loadRangeIndex) {
-        _rangeIndex = new RangeIndexReader(segmentReader.getIndexFor(columnName, ColumnIndexType.RANGE_INDEX));
+        _rangeIndex = new RangeIndexReaderImpl(segmentReader.getIndexFor(columnName, ColumnIndexType.RANGE_INDEX));
       } else {
         _rangeIndex = null;
       }
@@ -199,7 +200,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
   }
 
   @Override
-  public InvertedIndexReader<?> getRangeIndex() {
+  public RangeIndexReader<?> getRangeIndex() {
     return _rangeIndex;
   }
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/BaseDataSource.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/BaseDataSource.java
index 5e57d7c..828b7e4 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/BaseDataSource.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/BaseDataSource.java
@@ -28,6 +28,7 @@ import org.apache.pinot.segment.spi.index.reader.H3IndexReader;
 import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
 import org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
 import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
+import org.apache.pinot.segment.spi.index.reader.RangeIndexReader;
 import org.apache.pinot.segment.spi.index.reader.TextIndexReader;
 
 
@@ -36,7 +37,7 @@ public abstract class BaseDataSource implements DataSource {
   private final ForwardIndexReader<?> _forwardIndex;
   private final Dictionary _dictionary;
   private final InvertedIndexReader<?> _invertedIndex;
-  private final InvertedIndexReader<?> _rangeIndex;
+  private final RangeIndexReader<?> _rangeIndex;
   private final TextIndexReader _textIndex;
   private final TextIndexReader _fstIndex;
   private final JsonIndexReader _jsonIndex;
@@ -46,7 +47,7 @@ public abstract class BaseDataSource implements DataSource {
 
   public BaseDataSource(DataSourceMetadata dataSourceMetadata, ForwardIndexReader<?> forwardIndex,
       @Nullable Dictionary dictionary, @Nullable InvertedIndexReader<?> invertedIndex,
-      @Nullable InvertedIndexReader<?> rangeIndex, @Nullable TextIndexReader textIndex,
+      @Nullable RangeIndexReader<?> rangeIndex, @Nullable TextIndexReader textIndex,
       @Nullable TextIndexReader fstIndex, @Nullable JsonIndexReader jsonIndex, @Nullable H3IndexReader h3Index,
       @Nullable BloomFilterReader bloomFilter, @Nullable NullValueVectorReader nullValueVector) {
     _dataSourceMetadata = dataSourceMetadata;
@@ -86,7 +87,7 @@ public abstract class BaseDataSource implements DataSource {
 
   @Nullable
   @Override
-  public InvertedIndexReader<?> getRangeIndex() {
+  public RangeIndexReader<?> getRangeIndex() {
     return _rangeIndex;
   }
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/MutableDataSource.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/MutableDataSource.java
index 6671c26..83bc1ab 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/MutableDataSource.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/MutableDataSource.java
@@ -28,6 +28,7 @@ import org.apache.pinot.segment.spi.index.reader.H3IndexReader;
 import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
 import org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
 import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
+import org.apache.pinot.segment.spi.index.reader.RangeIndexReader;
 import org.apache.pinot.segment.spi.index.reader.TextIndexReader;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -43,7 +44,7 @@ public class MutableDataSource extends BaseDataSource {
   public MutableDataSource(FieldSpec fieldSpec, int numDocs, int numValues, int maxNumValuesPerMVEntry,
       @Nullable PartitionFunction partitionFunction, @Nullable Set<Integer> partitions, @Nullable Comparable minValue,
       @Nullable Comparable maxValue, ForwardIndexReader forwardIndex, @Nullable Dictionary dictionary,
-      @Nullable InvertedIndexReader invertedIndex, @Nullable InvertedIndexReader rangeIndex,
+      @Nullable InvertedIndexReader invertedIndex, @Nullable RangeIndexReader rangeIndex,
       @Nullable TextIndexReader textIndex, boolean enableFST, @Nullable JsonIndexReader jsonIndex,
       @Nullable H3IndexReader h3Index, @Nullable BloomFilterReader bloomFilter,
       @Nullable NullValueVectorReader nullValueVector) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/RangeIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/RangeIndexReaderImpl.java
similarity index 59%
rename from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/RangeIndexReader.java
rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/RangeIndexReaderImpl.java
index feaf02c..a2733ce 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/RangeIndexReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/RangeIndexReaderImpl.java
@@ -18,13 +18,14 @@
  */
 package org.apache.pinot.segment.local.segment.index.readers;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.lang.ref.SoftReference;
 import java.nio.ByteBuffer;
-import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.index.reader.RangeIndexReader;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,8 +33,8 @@ import static org.apache.pinot.spi.data.FieldSpec.DataType;
 import static org.apache.pinot.spi.data.FieldSpec.DataType.valueOf;
 
 
-public class RangeIndexReader implements InvertedIndexReader<ImmutableRoaringBitmap> {
-  public static final Logger LOGGER = LoggerFactory.getLogger(RangeIndexReader.class);
+public class RangeIndexReaderImpl implements RangeIndexReader<ImmutableRoaringBitmap> {
+  public static final Logger LOGGER = LoggerFactory.getLogger(RangeIndexReaderImpl.class);
 
   private final PinotDataBuffer _dataBuffer;
   private final DataType _valueType;
@@ -44,7 +45,7 @@ public class RangeIndexReader implements InvertedIndexReader<ImmutableRoaringBit
 
   private volatile SoftReference<SoftReference<ImmutableRoaringBitmap>[]> _bitmaps;
 
-  public RangeIndexReader(PinotDataBuffer dataBuffer) {
+  public RangeIndexReaderImpl(PinotDataBuffer dataBuffer) {
     _dataBuffer = dataBuffer;
     long offset = 0;
     //READER VERSION
@@ -65,9 +66,10 @@ public class RangeIndexReader implements InvertedIndexReader<ImmutableRoaringBit
     long rangeArrayStartOffset = offset;
 
     _rangeStartArray = new Number[_numRanges];
-    final long lastOffset = dataBuffer.getLong(offset + (_numRanges + 1) * _valueType.size() + _numRanges * Long.BYTES);
+    final long lastOffset = dataBuffer.getLong(offset + (long) (_numRanges + 1) * _valueType.size()
+        + (long) _numRanges * Long.BYTES);
 
-    _bitmapIndexOffset = offset + (_numRanges + 1) * _valueType.size();
+    _bitmapIndexOffset = offset + (long) (_numRanges + 1) * _valueType.size();
 
     Preconditions.checkState(lastOffset == dataBuffer.size(),
         "The last offset should be equal to buffer size! Current lastOffset: " + lastOffset + ", buffer size: "
@@ -75,43 +77,106 @@ public class RangeIndexReader implements InvertedIndexReader<ImmutableRoaringBit
     switch (_valueType) {
       case INT:
         for (int i = 0; i < _numRanges; i++) {
-          _rangeStartArray[i] = dataBuffer.getInt(rangeArrayStartOffset + i * Integer.BYTES);
+          _rangeStartArray[i] = dataBuffer.getInt(rangeArrayStartOffset + (long) i * Integer.BYTES);
         }
-        _lastRangeEnd = dataBuffer.getInt(rangeArrayStartOffset + _numRanges * Integer.BYTES);
+        _lastRangeEnd = dataBuffer.getInt(rangeArrayStartOffset + (long) _numRanges * Integer.BYTES);
         break;
       case LONG:
         for (int i = 0; i < _numRanges; i++) {
-          _rangeStartArray[i] = dataBuffer.getLong(rangeArrayStartOffset + i * Long.BYTES);
+          _rangeStartArray[i] = dataBuffer.getLong(rangeArrayStartOffset + (long) i * Long.BYTES);
         }
-        _lastRangeEnd = dataBuffer.getLong(rangeArrayStartOffset + _numRanges * Long.BYTES);
+        _lastRangeEnd = dataBuffer.getLong(rangeArrayStartOffset + (long) _numRanges * Long.BYTES);
         break;
       case FLOAT:
         for (int i = 0; i < _numRanges; i++) {
-          _rangeStartArray[i] = dataBuffer.getFloat(rangeArrayStartOffset + i * Float.BYTES);
+          _rangeStartArray[i] = dataBuffer.getFloat(rangeArrayStartOffset + (long) i * Float.BYTES);
         }
-        _lastRangeEnd = dataBuffer.getFloat(rangeArrayStartOffset + _numRanges * Float.BYTES);
+        _lastRangeEnd = dataBuffer.getFloat(rangeArrayStartOffset + (long) _numRanges * Float.BYTES);
         break;
       case DOUBLE:
         for (int i = 0; i < _numRanges; i++) {
-          _rangeStartArray[i] = dataBuffer.getDouble(rangeArrayStartOffset + i * Double.BYTES);
+          _rangeStartArray[i] = dataBuffer.getDouble(rangeArrayStartOffset + (long) i * Double.BYTES);
         }
-        _lastRangeEnd = dataBuffer.getDouble(rangeArrayStartOffset + _numRanges * Double.BYTES);
+        _lastRangeEnd = dataBuffer.getDouble(rangeArrayStartOffset + (long) _numRanges * Double.BYTES);
         break;
       default:
         throw new RuntimeException("Range Index Unsupported for dataType:" + _valueType);
     }
   }
 
-  @VisibleForTesting
-  public Number[] getRangeStartArray() {
-    return _rangeStartArray;
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  @Nullable
+  public ImmutableRoaringBitmap getMatchingDocIds(long min, long max) {
+    return getMatchesInRange(findRangeId(min), findRangeId(max));
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  @Nullable
+  public ImmutableRoaringBitmap getMatchingDocIds(int min, int max) {
+    return getMatchesInRange(findRangeId(min), findRangeId(max));
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  @Nullable
+  public ImmutableRoaringBitmap getMatchingDocIds(double min, double max) {
+    return getMatchesInRange(findRangeId(min), findRangeId(max));
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  @Nullable
+  public ImmutableRoaringBitmap getMatchingDocIds(float min, float max) {
+    return getMatchesInRange(findRangeId(min), findRangeId(max));
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  @Nullable
+  public ImmutableRoaringBitmap getPartiallyMatchingDocIds(long min, long max) {
+    return getPartialMatchesInRange(findRangeId(min), findRangeId(max));
   }
 
   /**
    * {@inheritDoc}
    */
   @Override
-  public ImmutableRoaringBitmap getDocIds(int rangeId) {
+  @Nullable
+  public ImmutableRoaringBitmap getPartiallyMatchingDocIds(int min, int max) {
+    return getPartialMatchesInRange(findRangeId(min), findRangeId(max));
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  @Nullable
+  public ImmutableRoaringBitmap getPartiallyMatchingDocIds(double min, double max) {
+    return getPartialMatchesInRange(findRangeId(min), findRangeId(max));
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  @Nullable
+  public ImmutableRoaringBitmap getPartiallyMatchingDocIds(float min, float max) {
+    return getPartialMatchesInRange(findRangeId(min), findRangeId(max));
+  }
+
+  private ImmutableRoaringBitmap getDocIds(int rangeId) {
     SoftReference<ImmutableRoaringBitmap>[] bitmapArrayReference = null;
     // Return the bitmap if it's still on heap
     if (_bitmaps != null) {
@@ -155,7 +220,7 @@ public class RangeIndexReader implements InvertedIndexReader<ImmutableRoaringBit
   }
 
   private long getOffset(final int rangeId) {
-    return _dataBuffer.getLong(_bitmapIndexOffset + rangeId * Long.BYTES);
+    return _dataBuffer.getLong(_bitmapIndexOffset + (long) rangeId * Long.BYTES);
   }
 
   /**
@@ -164,52 +229,40 @@ public class RangeIndexReader implements InvertedIndexReader<ImmutableRoaringBit
    * @param value
    * @return
    */
-  public int findRangeId(int value) {
+  private int findRangeId(int value) {
     for (int i = 0; i < _rangeStartArray.length; i++) {
       if (value < _rangeStartArray[i].intValue()) {
         return i - 1;
       }
     }
-    if (value <= _lastRangeEnd.intValue()) {
-      return _rangeStartArray.length - 1;
-    }
-    return -1;
+    return value <= _lastRangeEnd.intValue() ? _rangeStartArray.length - 1 : _rangeStartArray.length;
   }
 
-  public int findRangeId(long value) {
+  private int findRangeId(long value) {
     for (int i = 0; i < _rangeStartArray.length; i++) {
       if (value < _rangeStartArray[i].longValue()) {
         return i - 1;
       }
     }
-    if (value <= _lastRangeEnd.longValue()) {
-      return _rangeStartArray.length - 1;
-    }
-    return -1;
+    return value <= _lastRangeEnd.longValue() ? _rangeStartArray.length - 1 : _rangeStartArray.length;
   }
 
-  public int findRangeId(float value) {
+  private int findRangeId(float value) {
     for (int i = 0; i < _rangeStartArray.length; i++) {
       if (value < _rangeStartArray[i].floatValue()) {
         return i - 1;
       }
     }
-    if (value <= _lastRangeEnd.floatValue()) {
-      return _rangeStartArray.length - 1;
-    }
-    return -1;
+    return value <= _lastRangeEnd.floatValue() ? _rangeStartArray.length - 1 : _rangeStartArray.length;
   }
 
-  public int findRangeId(double value) {
+  private int findRangeId(double value) {
     for (int i = 0; i < _rangeStartArray.length; i++) {
       if (value < _rangeStartArray[i].doubleValue()) {
         return i - 1;
       }
     }
-    if (value <= _lastRangeEnd.doubleValue()) {
-      return _rangeStartArray.length - 1;
-    }
-    return -1;
+    return value <= _lastRangeEnd.doubleValue() ? _rangeStartArray.length - 1 : _rangeStartArray.length;
   }
 
   @Override
@@ -217,4 +270,30 @@ public class RangeIndexReader implements InvertedIndexReader<ImmutableRoaringBit
     // NOTE: DO NOT close the PinotDataBuffer here because it is tracked by the caller and might be reused later. The
     // caller is responsible of closing the PinotDataBuffer.
   }
+
+  private ImmutableRoaringBitmap getMatchesInRange(int firstRangeId, int lastRangeId) {
+    // produce bitmap of all ranges fully covered by buckets.
+    // 1. if firstRangeId is -1, the query range covers the first bucket
+    // 2. if lastRangeId is _rangeStartArray.length, the query range covers the last bucket
+    // 3. the loop isn't entered if the range ids are equal
+    MutableRoaringBitmap matching = firstRangeId + 1 < lastRangeId ? new MutableRoaringBitmap() : null;
+    for (int rangeId = firstRangeId + 1; rangeId < lastRangeId; rangeId++) {
+      matching.or(getDocIds(rangeId));
+    }
+    return matching;
+  }
+
+  private ImmutableRoaringBitmap getPartialMatchesInRange(int firstRangeId, int lastRangeId) {
+    if (isOutOfRange(firstRangeId)) {
+      return isOutOfRange(lastRangeId) ? null : getDocIds(lastRangeId);
+    }
+    if (isOutOfRange(lastRangeId)) {
+      return getDocIds(firstRangeId);
+    }
+    return ImmutableRoaringBitmap.or(getDocIds(firstRangeId), getDocIds(lastRangeId));
+  }
+
+  private boolean isOutOfRange(int rangeId) {
+    return rangeId < 0 || rangeId >= _rangeStartArray.length;
+  }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnIndexContainer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnIndexContainer.java
index 4dc6282..16b9941 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnIndexContainer.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnIndexContainer.java
@@ -27,6 +27,7 @@ import org.apache.pinot.segment.spi.index.reader.H3IndexReader;
 import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
 import org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
 import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
+import org.apache.pinot.segment.spi.index.reader.RangeIndexReader;
 import org.apache.pinot.segment.spi.index.reader.TextIndexReader;
 
 
@@ -56,7 +57,7 @@ public class VirtualColumnIndexContainer implements ColumnIndexContainer {
   }
 
   @Override
-  public InvertedIndexReader<?> getRangeIndex() {
+  public RangeIndexReader<?> getRangeIndex() {
     return null;
   }
 
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RangeIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RangeIndexCreatorTest.java
index a3289e6..956b1c9 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RangeIndexCreatorTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/RangeIndexCreatorTest.java
@@ -20,10 +20,12 @@ package org.apache.pinot.segment.local.segment.index.creator;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Random;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.segment.local.segment.creator.impl.inv.RangeIndexCreator;
-import org.apache.pinot.segment.local.segment.index.readers.RangeIndexReader;
+import org.apache.pinot.segment.local.segment.index.readers.RangeIndexReaderImpl;
+import org.apache.pinot.segment.spi.index.reader.RangeIndexReader;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -35,11 +37,12 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.apache.pinot.segment.spi.V1Constants.Indexes.BITMAP_RANGE_INDEX_FILE_EXTENSION;
+import static org.testng.Assert.*;
 
 
 public class RangeIndexCreatorTest {
   private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "RangeIndexCreatorTest");
-  private static final Random RANDOM = new Random();
+  private static final Random RANDOM = new Random(42);
   private static final String COLUMN_NAME = "testColumn";
 
   @BeforeClass
@@ -106,23 +109,22 @@ public class RangeIndexCreatorTest {
       throws IOException {
     FieldSpec fieldSpec = new DimensionFieldSpec(COLUMN_NAME, dataType, true);
     int numDocs = 1000;
-    Number[] values = new Number[numDocs];
+    Object values = valuesArray(dataType, numDocs);
 
+    int numValuesPerRange;
     try (RangeIndexCreator creator = new RangeIndexCreator(INDEX_DIR, fieldSpec, dataType, -1, -1, numDocs, numDocs)) {
       addDataToIndexer(dataType, numDocs, 1, creator, values);
       creator.seal();
+      // account for off by one bug in v1 implementation
+      numValuesPerRange = creator.getNumValuesPerRange() + 1;
     }
 
     File rangeIndexFile = new File(INDEX_DIR, COLUMN_NAME + BITMAP_RANGE_INDEX_FILE_EXTENSION);
     try (PinotDataBuffer dataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(rangeIndexFile)) {
-      RangeIndexReader rangeIndexReader = new RangeIndexReader(dataBuffer);
-      Number[] rangeStartArray = rangeIndexReader.getRangeStartArray();
-      for (int rangeId = 0; rangeId < rangeStartArray.length; rangeId++) {
-        ImmutableRoaringBitmap bitmap = rangeIndexReader.getDocIds(rangeId);
-        for (int docId : bitmap.toArray()) {
-          checkValueForDocId(dataType, values, rangeStartArray, rangeId, docId, 1);
-        }
-      }
+      RangeIndexReaderImpl rangeIndexReader = new RangeIndexReaderImpl(dataBuffer);
+      verifyRangesForDataType(dataType, values,
+          splitIntoRanges(dataType, values, numValuesPerRange),
+          1, rangeIndexReader);
     }
 
     FileUtils.forceDelete(rangeIndexFile);
@@ -134,38 +136,36 @@ public class RangeIndexCreatorTest {
     int numDocs = 1000;
     int numValuesPerMVEntry = 10;
     int numValues = numDocs * numValuesPerMVEntry;
-    Number[] values = new Number[numValues];
+    Object values = valuesArray(dataType, numValues);
 
+    int numValuesPerRange;
     try (
         RangeIndexCreator creator = new RangeIndexCreator(INDEX_DIR, fieldSpec, dataType, -1, -1, numDocs, numValues)) {
       addDataToIndexer(dataType, numDocs, numValuesPerMVEntry, creator, values);
       creator.seal();
+      // account for off by one bug in existing implementation
+      numValuesPerRange = creator.getNumValuesPerRange() + 1;
     }
 
     File rangeIndexFile = new File(INDEX_DIR, COLUMN_NAME + BITMAP_RANGE_INDEX_FILE_EXTENSION);
     try (PinotDataBuffer dataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(rangeIndexFile)) {
-      RangeIndexReader rangeIndexReader = new RangeIndexReader(dataBuffer);
-      Number[] rangeStartArray = rangeIndexReader.getRangeStartArray();
-      int numRanges = rangeStartArray.length;
-      for (int rangeId = 0; rangeId < numRanges; rangeId++) {
-        ImmutableRoaringBitmap bitmap = rangeIndexReader.getDocIds(rangeId);
-        for (int docId : bitmap.toArray()) {
-          checkValueForDocId(dataType, values, rangeStartArray, rangeId, docId, numValuesPerMVEntry);
-        }
-      }
+      RangeIndexReaderImpl rangeIndexReader = new RangeIndexReaderImpl(dataBuffer);
+      verifyRangesForDataType(dataType, values,
+          splitIntoRanges(dataType, values, numValuesPerRange),
+          numValuesPerMVEntry, rangeIndexReader);
     }
 
     FileUtils.forceDelete(rangeIndexFile);
   }
 
   private void addDataToIndexer(DataType dataType, int numDocs, int numValuesPerEntry, RangeIndexCreator creator,
-      Number[] values) {
+      Object values) {
     switch (dataType) {
       case INT:
         if (numValuesPerEntry == 1) {
           for (int i = 0; i < numDocs; i++) {
             int value = RANDOM.nextInt();
-            values[i] = value;
+            ((int[]) values)[i] = value;
             creator.add(value);
           }
         } else {
@@ -174,7 +174,7 @@ public class RangeIndexCreatorTest {
             for (int j = 0; j < numValuesPerEntry; j++) {
               int value = RANDOM.nextInt();
               intValues[j] = value;
-              values[i * numValuesPerEntry + j] = value;
+              ((int[]) values)[i * numValuesPerEntry + j] = value;
             }
             creator.add(intValues, numValuesPerEntry);
           }
@@ -184,7 +184,7 @@ public class RangeIndexCreatorTest {
         if (numValuesPerEntry == 1) {
           for (int i = 0; i < numDocs; i++) {
             long value = RANDOM.nextLong();
-            values[i] = value;
+            ((long[]) values)[i] = value;
             creator.add(value);
           }
         } else {
@@ -193,7 +193,7 @@ public class RangeIndexCreatorTest {
             for (int j = 0; j < numValuesPerEntry; j++) {
               long value = RANDOM.nextLong();
               longValues[j] = value;
-              values[i * numValuesPerEntry + j] = value;
+              ((long[]) values)[i * numValuesPerEntry + j] = value;
             }
             creator.add(longValues, numValuesPerEntry);
           }
@@ -203,7 +203,7 @@ public class RangeIndexCreatorTest {
         if (numValuesPerEntry == 1) {
           for (int i = 0; i < numDocs; i++) {
             float value = RANDOM.nextFloat();
-            values[i] = value;
+            ((float[]) values)[i] = value;
             creator.add(value);
           }
         } else {
@@ -212,7 +212,7 @@ public class RangeIndexCreatorTest {
             for (int j = 0; j < numValuesPerEntry; j++) {
               float value = RANDOM.nextFloat();
               floatValues[j] = value;
-              values[i * numValuesPerEntry + j] = value;
+              ((float[]) values)[i * numValuesPerEntry + j] = value;
             }
             creator.add(floatValues, numValuesPerEntry);
           }
@@ -222,7 +222,7 @@ public class RangeIndexCreatorTest {
         if (numValuesPerEntry == 1) {
           for (int i = 0; i < numDocs; i++) {
             double value = RANDOM.nextDouble();
-            values[i] = value;
+            ((double[]) values)[i] = value;
             creator.add(value);
           }
         } else {
@@ -231,7 +231,7 @@ public class RangeIndexCreatorTest {
             for (int j = 0; j < numValuesPerEntry; j++) {
               double value = RANDOM.nextDouble();
               doubleValues[j] = value;
-              values[i * numValuesPerEntry + j] = value;
+              ((double[]) values)[i * numValuesPerEntry + j] = value;
             }
             creator.add(doubleValues, numValuesPerEntry);
           }
@@ -242,141 +242,352 @@ public class RangeIndexCreatorTest {
     }
   }
 
-  private void checkValueForDocId(DataType dataType, Number[] values, Number[] rangeStartArray, int rangeId, int docId,
+  private void verifyRangesForDataType(DataType dataType, Object values, Object ranges, int numValuesPerMVEntry,
+                                       RangeIndexReader<ImmutableRoaringBitmap> rangeIndexReader) {
+    switch (dataType) {
+      case INT: {
+        // single bucket ranges
+        int rangeId = 0;
+        for (int[] range : (int[][]) ranges) {
+          assertNull(rangeIndexReader.getMatchingDocIds(range[0], range[1]),
+              "range index can't guarantee match within a single range");
+          ImmutableRoaringBitmap partialMatches = rangeIndexReader.getPartiallyMatchingDocIds(range[0], range[1]);
+          assertNotNull(partialMatches, "partial matches for single range must not be null");
+          for (int docId : partialMatches.toArray()) {
+            checkValueForDocId(dataType, values, ranges, rangeId, docId, numValuesPerMVEntry);
+          }
+          ++rangeId;
+        }
+        // multi bucket ranges
+        int[] lowerPartialRange = ((int[][]) ranges)[0];
+        int[] coveredRange = ((int[][]) ranges)[1];
+        int[] upperPartialRange = ((int[][]) ranges)[2];
+        ImmutableRoaringBitmap matches = rangeIndexReader.getMatchingDocIds(lowerPartialRange[0], upperPartialRange[1]);
+        assertNotNull(matches,  "matches for covered range must not be null");
+        for (int docId : matches.toArray()) {
+          checkValueForDocId(dataType, values, ranges, 1, docId, numValuesPerMVEntry);
+        }
+        assertEquals(matches, rangeIndexReader.getPartiallyMatchingDocIds(coveredRange[0], coveredRange[1]));
+        // partial matches must be the combination of the two edge buckets
+        ImmutableRoaringBitmap partialMatches = rangeIndexReader.getPartiallyMatchingDocIds(
+            lowerPartialRange[0], upperPartialRange[1]);
+        assertNotNull(partialMatches, "partial matches for single range must not be null");
+        assertEquals(ImmutableRoaringBitmap.or(
+            rangeIndexReader.getPartiallyMatchingDocIds(lowerPartialRange[0], lowerPartialRange[1]),
+            rangeIndexReader.getPartiallyMatchingDocIds(upperPartialRange[0], upperPartialRange[1])), partialMatches);
+        // edge cases
+        assertEquals(((int[]) values).length, numValuesPerMVEntry
+            * rangeIndexReader.getMatchingDocIds(Integer.MIN_VALUE, Integer.MAX_VALUE).getCardinality());
+        assertNull(rangeIndexReader.getPartiallyMatchingDocIds(Integer.MIN_VALUE, Integer.MAX_VALUE));
+        assertNull(rangeIndexReader.getMatchingDocIds(Integer.MIN_VALUE, Integer.MIN_VALUE));
+        assertNull(rangeIndexReader.getPartiallyMatchingDocIds(Integer.MIN_VALUE, Integer.MIN_VALUE));
+        assertNull(rangeIndexReader.getMatchingDocIds(Integer.MAX_VALUE, Integer.MAX_VALUE));
+        assertNull(rangeIndexReader.getPartiallyMatchingDocIds(Integer.MAX_VALUE, Integer.MAX_VALUE));
+        assertEquals(rangeIndexReader.getPartiallyMatchingDocIds(lowerPartialRange[0], coveredRange[1]),
+            rangeIndexReader.getMatchingDocIds(Integer.MIN_VALUE, upperPartialRange[1]));
+        break;
+      }
+      case LONG: {
+        // single bucket ranges
+        int rangeId = 0;
+        for (long[] range : (long[][]) ranges) {
+          assertNull(rangeIndexReader.getMatchingDocIds(range[0], range[1]),
+              "range index can't guarantee match within a single range");
+          ImmutableRoaringBitmap partialMatches = rangeIndexReader.getPartiallyMatchingDocIds(range[0], range[1]);
+          assertNotNull(partialMatches, "partial matches for single range must not be null");
+          for (int docId : partialMatches.toArray()) {
+            checkValueForDocId(dataType, values, ranges, rangeId, docId, numValuesPerMVEntry);
+          }
+          ++rangeId;
+        }
+        // multi bucket ranges
+        long[] lowerPartialRange = ((long[][]) ranges)[0];
+        long[] coveredRange = ((long[][]) ranges)[1];
+        long[] upperPartialRange = ((long[][]) ranges)[2];
+        ImmutableRoaringBitmap matches = rangeIndexReader.getMatchingDocIds(lowerPartialRange[0], upperPartialRange[1]);
+        assertNotNull(matches,  "matches for covered range must not be null");
+        for (int docId : matches.toArray()) {
+          checkValueForDocId(dataType, values, ranges, 1, docId, numValuesPerMVEntry);
+        }
+        assertEquals(matches, rangeIndexReader.getPartiallyMatchingDocIds(coveredRange[0], coveredRange[1]));
+        // partial matches must be the combination of the two edge buckets
+        ImmutableRoaringBitmap partialMatches = rangeIndexReader.getPartiallyMatchingDocIds(
+            lowerPartialRange[0], upperPartialRange[1]);
+        assertNotNull(partialMatches, "partial matches for single range must not be null");
+        assertEquals(ImmutableRoaringBitmap.or(
+            rangeIndexReader.getPartiallyMatchingDocIds(lowerPartialRange[0], lowerPartialRange[1]),
+            rangeIndexReader.getPartiallyMatchingDocIds(upperPartialRange[0], upperPartialRange[1])), partialMatches);
+        // edge cases
+        assertEquals(((long[]) values).length, numValuesPerMVEntry
+            * rangeIndexReader.getMatchingDocIds(Long.MIN_VALUE, Long.MAX_VALUE).getCardinality());
+        assertNull(rangeIndexReader.getPartiallyMatchingDocIds(Long.MIN_VALUE, Long.MAX_VALUE));
+        assertNull(rangeIndexReader.getMatchingDocIds(Long.MIN_VALUE, Long.MIN_VALUE));
+        assertNull(rangeIndexReader.getPartiallyMatchingDocIds(Long.MIN_VALUE, Long.MIN_VALUE));
+        assertNull(rangeIndexReader.getMatchingDocIds(Long.MAX_VALUE, Long.MAX_VALUE));
+        assertNull(rangeIndexReader.getPartiallyMatchingDocIds(Long.MAX_VALUE, Long.MAX_VALUE));
+        assertEquals(rangeIndexReader.getPartiallyMatchingDocIds(lowerPartialRange[0], coveredRange[1]),
+            rangeIndexReader.getMatchingDocIds(Long.MIN_VALUE, upperPartialRange[1]));
+        break;
+      }
+      case FLOAT: {
+        // single bucket ranges
+        int rangeId = 0;
+        for (float[] range : (float[][]) ranges) {
+          assertNull(rangeIndexReader.getMatchingDocIds(range[0], range[1]),
+              "range index can't guarantee match within a single range");
+          ImmutableRoaringBitmap partialMatches = rangeIndexReader.getPartiallyMatchingDocIds(range[0], range[1]);
+          assertNotNull(partialMatches, "partial matches for single range must not be null");
+          for (int docId : partialMatches.toArray()) {
+            checkValueForDocId(dataType, values, ranges, rangeId, docId, numValuesPerMVEntry);
+          }
+          ++rangeId;
+        }
+        // multi bucket ranges
+        float[] lowerPartialRange = ((float[][]) ranges)[0];
+        float[] coveredRange = ((float[][]) ranges)[1];
+        float[] upperPartialRange = ((float[][]) ranges)[2];
+        ImmutableRoaringBitmap matches = rangeIndexReader.getMatchingDocIds(lowerPartialRange[0], upperPartialRange[1]);
+        assertNotNull(matches,  "matches for covered range must not be null");
+        for (int docId : matches.toArray()) {
+          checkValueForDocId(dataType, values, ranges, 1, docId, numValuesPerMVEntry);
+        }
+        assertEquals(matches, rangeIndexReader.getPartiallyMatchingDocIds(coveredRange[0], coveredRange[1]));
+        // partial matches must be the combination of the two edge buckets
+        ImmutableRoaringBitmap partialMatches = rangeIndexReader.getPartiallyMatchingDocIds(
+            lowerPartialRange[0], upperPartialRange[1]);
+        assertNotNull(partialMatches, "partial matches for single range must not be null");
+        assertEquals(ImmutableRoaringBitmap.or(
+            rangeIndexReader.getPartiallyMatchingDocIds(lowerPartialRange[0], lowerPartialRange[1]),
+            rangeIndexReader.getPartiallyMatchingDocIds(upperPartialRange[0], upperPartialRange[1])), partialMatches);
+        // edge cases
+        assertEquals(((float[]) values).length, numValuesPerMVEntry
+            * rangeIndexReader.getMatchingDocIds(Float.NEGATIVE_INFINITY, Float.POSITIVE_INFINITY).getCardinality());
+        assertNull(rangeIndexReader.getPartiallyMatchingDocIds(Float.NEGATIVE_INFINITY, Float.POSITIVE_INFINITY));
+        assertNull(rangeIndexReader.getMatchingDocIds(Float.NEGATIVE_INFINITY, Float.NEGATIVE_INFINITY));
+        assertNull(rangeIndexReader.getPartiallyMatchingDocIds(Float.NEGATIVE_INFINITY, Float.NEGATIVE_INFINITY));
+        assertNull(rangeIndexReader.getMatchingDocIds(Float.POSITIVE_INFINITY, Float.POSITIVE_INFINITY));
+        assertNull(rangeIndexReader.getPartiallyMatchingDocIds(Float.POSITIVE_INFINITY, Float.POSITIVE_INFINITY));
+        assertEquals(rangeIndexReader.getPartiallyMatchingDocIds(lowerPartialRange[0], coveredRange[1]),
+            rangeIndexReader.getMatchingDocIds(Float.NEGATIVE_INFINITY, upperPartialRange[1]));
+        break;
+      }
+      case DOUBLE: {
+        // single bucket ranges
+        int rangeId = 0;
+        for (double[] range : (double[][]) ranges) {
+          assertNull(rangeIndexReader.getMatchingDocIds(range[0], range[1]),
+              "range index can't guarantee match within a single range");
+          ImmutableRoaringBitmap partialMatches = rangeIndexReader.getPartiallyMatchingDocIds(range[0], range[1]);
+          assertNotNull(partialMatches, "partial matches for single range must not be null");
+          for (int docId : partialMatches.toArray()) {
+            checkValueForDocId(dataType, values, ranges, rangeId, docId, numValuesPerMVEntry);
+          }
+          ++rangeId;
+        }
+        // multi bucket ranges
+        double[] lowerPartialRange = ((double[][]) ranges)[0];
+        double[] coveredRange = ((double[][]) ranges)[1];
+        double[] upperPartialRange = ((double[][]) ranges)[2];
+        ImmutableRoaringBitmap matches = rangeIndexReader.getMatchingDocIds(lowerPartialRange[0], upperPartialRange[1]);
+        assertNotNull(matches,  "matches for covered range must not be null");
+        for (int docId : matches.toArray()) {
+          checkValueForDocId(dataType, values, ranges, 1, docId, numValuesPerMVEntry);
+        }
+        assertEquals(matches, rangeIndexReader.getPartiallyMatchingDocIds(coveredRange[0], coveredRange[1]));
+        // partial matches must be the combination of the two edge buckets
+        ImmutableRoaringBitmap partialMatches = rangeIndexReader.getPartiallyMatchingDocIds(
+            lowerPartialRange[0], upperPartialRange[1]);
+        assertNotNull(partialMatches, "partial matches for single range must not be null");
+        assertEquals(ImmutableRoaringBitmap.or(
+            rangeIndexReader.getPartiallyMatchingDocIds(lowerPartialRange[0], lowerPartialRange[1]),
+            rangeIndexReader.getPartiallyMatchingDocIds(upperPartialRange[0], upperPartialRange[1])), partialMatches);
+        // edge cases
+        assertEquals(((double[]) values).length, numValuesPerMVEntry
+            * rangeIndexReader.getMatchingDocIds(Double.NEGATIVE_INFINITY, Double.POSITIVE_INFINITY).getCardinality());
+        assertNull(rangeIndexReader.getPartiallyMatchingDocIds(Double.NEGATIVE_INFINITY, Double.POSITIVE_INFINITY));
+        assertNull(rangeIndexReader.getMatchingDocIds(Double.NEGATIVE_INFINITY, Double.NEGATIVE_INFINITY));
+        assertNull(rangeIndexReader.getPartiallyMatchingDocIds(Double.NEGATIVE_INFINITY, Double.NEGATIVE_INFINITY));
+        assertNull(rangeIndexReader.getMatchingDocIds(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY));
+        assertNull(rangeIndexReader.getPartiallyMatchingDocIds(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY));
+        assertEquals(rangeIndexReader.getPartiallyMatchingDocIds(lowerPartialRange[0], coveredRange[1]),
+            rangeIndexReader.getMatchingDocIds(Double.NEGATIVE_INFINITY, upperPartialRange[1]));
+        break;
+      }
+      default:
+        throw new IllegalStateException();
+    }
+  }
+
+  private void checkValueForDocId(DataType dataType, Object values, Object ranges, int rangeId, int docId,
       int numValuesPerEntry) {
     switch (dataType) {
       case INT:
         if (numValuesPerEntry == 1) {
-          checkInt(rangeStartArray, rangeId, values[docId].intValue());
+          checkInt((int[][]) ranges, rangeId, ((int[]) values)[docId]);
         } else {
-          checkIntMV(rangeStartArray, rangeId, values, docId, numValuesPerEntry);
+          checkIntMV((int[][]) ranges, rangeId, (int[]) values, docId, numValuesPerEntry);
         }
         break;
       case LONG:
         if (numValuesPerEntry == 1) {
-          checkLong(rangeStartArray, rangeId, values[docId].longValue());
+          checkLong((long[][]) ranges, rangeId, ((long[]) values)[docId]);
         } else {
-          checkLongMV(rangeStartArray, rangeId, values, docId, numValuesPerEntry);
+          checkLongMV((long[][]) ranges, rangeId, (long[]) values, docId, numValuesPerEntry);
         }
         break;
       case FLOAT:
         if (numValuesPerEntry == 1) {
-          checkFloat(rangeStartArray, rangeId, values[docId].floatValue());
+          checkFloat((float[][]) ranges, rangeId, ((float[]) values)[docId]);
         } else {
-          checkFloatMV(rangeStartArray, rangeId, values, docId, numValuesPerEntry);
+          checkFloatMV((float[][]) ranges, rangeId, (float[]) values, docId, numValuesPerEntry);
         }
         break;
       case DOUBLE:
         if (numValuesPerEntry == 1) {
-          checkDouble(rangeStartArray, rangeId, values[docId].doubleValue());
+          checkDouble((double[][]) ranges, rangeId, ((double[]) values)[docId]);
         } else {
-          checkDoubleMV(rangeStartArray, rangeId, values, docId, numValuesPerEntry);
+          checkDoubleMV((double[][]) ranges, rangeId, (double[]) values, docId, numValuesPerEntry);
         }
         break;
       default:
-        throw new IllegalStateException();
+        throw new IllegalStateException("unexpected type " + dataType);
     }
   }
 
-  private void checkInt(Number[] rangeStartArray, int rangeId, int value) {
-    Assert.assertTrue(rangeStartArray[rangeId].intValue() <= value);
-    if (rangeId != rangeStartArray.length - 1) {
-      Assert.assertTrue(value < rangeStartArray[rangeId + 1].intValue());
-    }
+  private void checkInt(int[][] ranges, int rangeId, int value) {
+    Assert.assertTrue(ranges[rangeId][0] <= value);
+    Assert.assertTrue(value <= ranges[rangeId][1]);
   }
 
-  private void checkIntMV(Number[] rangeStartArray, int rangeId, Number[] values, int docId, int numValuesPerMVEntry) {
-    if (rangeId != rangeStartArray.length - 1) {
-      for (int i = 0; i < numValuesPerMVEntry; i++) {
-        if (rangeStartArray[rangeId].intValue() <= values[docId * numValuesPerMVEntry + i].intValue()
-            && values[docId * numValuesPerMVEntry + i].intValue() < rangeStartArray[rangeId + 1].intValue()) {
-          return;
-        }
-      }
-    } else {
-      for (int i = 0; i < numValuesPerMVEntry; i++) {
-        if (rangeStartArray[rangeId].intValue() <= values[docId * numValuesPerMVEntry + i].intValue()) {
-          return;
-        }
+  private void checkIntMV(int[][] ranges, int rangeId, int[] values, int docId, int numValuesPerMVEntry) {
+    for (int i = 0; i < numValuesPerMVEntry; i++) {
+      if (ranges[rangeId][0] <= values[docId * numValuesPerMVEntry + i]
+          && values[docId * numValuesPerMVEntry + i] <= ranges[rangeId][1]) {
+        return;
       }
     }
     Assert.fail();
   }
 
-  private void checkLong(Number[] rangeStartArray, int rangeId, long value) {
-    Assert.assertTrue(rangeStartArray[rangeId].longValue() <= value);
-    if (rangeId != rangeStartArray.length - 1) {
-      Assert.assertTrue(value < rangeStartArray[rangeId + 1].longValue());
-    }
+  private void checkLong(long[][] ranges, int rangeId, long value) {
+    Assert.assertTrue(ranges[rangeId][0] <= value);
+    Assert.assertTrue(value <= ranges[rangeId][1]);
   }
 
-  private void checkLongMV(Number[] rangeStartArray, int rangeId, Number[] values, int docId, int numValuesPerMVEntry) {
-    if (rangeId != rangeStartArray.length - 1) {
-      for (int i = 0; i < numValuesPerMVEntry; i++) {
-        if (rangeStartArray[rangeId].longValue() <= values[docId * numValuesPerMVEntry + i].longValue()
-            && values[docId * numValuesPerMVEntry + i].longValue() < rangeStartArray[rangeId + 1].longValue()) {
-          return;
-        }
-      }
-    } else {
-      for (int i = 0; i < numValuesPerMVEntry; i++) {
-        if (rangeStartArray[rangeId].longValue() <= values[docId * numValuesPerMVEntry + i].longValue()) {
-          return;
-        }
+  private void checkLongMV(long[][] ranges, int rangeId, long[] values, int docId, int numValuesPerMVEntry) {
+    for (int i = 0; i < numValuesPerMVEntry; i++) {
+      if (ranges[rangeId][0] <= values[docId * numValuesPerMVEntry + i]
+          && values[docId * numValuesPerMVEntry + i] <= ranges[rangeId][1]) {
+        return;
       }
     }
     Assert.fail();
   }
 
-  private void checkFloat(Number[] rangeStartArray, int rangeId, float value) {
-    Assert.assertTrue(rangeStartArray[rangeId].floatValue() <= value);
-    if (rangeId != rangeStartArray.length - 1) {
-      Assert.assertTrue(value < rangeStartArray[rangeId + 1].floatValue());
-    }
+  private void checkFloat(float[][] ranges, int rangeId, float value) {
+    Assert.assertTrue(ranges[rangeId][0] <= value);
+    Assert.assertTrue(value <= ranges[rangeId][1]);
   }
 
-  private void checkFloatMV(Number[] rangeStartArray, int rangeId, Number[] values, int docId,
+  private void checkFloatMV(float[][] ranges, int rangeId, float[] values, int docId,
       int numValuesPerMVEntry) {
-    if (rangeId != rangeStartArray.length - 1) {
-      for (int i = 0; i < numValuesPerMVEntry; i++) {
-        if (rangeStartArray[rangeId].floatValue() <= values[docId * numValuesPerMVEntry + i].floatValue()
-            && values[docId * numValuesPerMVEntry + i].floatValue() < rangeStartArray[rangeId + 1].floatValue()) {
-          return;
-        }
+    for (int i = 0; i < numValuesPerMVEntry; i++) {
+      if (ranges[rangeId][0] <= values[docId * numValuesPerMVEntry + i]
+          && values[docId * numValuesPerMVEntry + i] <= ranges[rangeId][1]) {
+        return;
       }
-    } else {
-      for (int i = 0; i < numValuesPerMVEntry; i++) {
-        if (rangeStartArray[rangeId].floatValue() <= values[docId * numValuesPerMVEntry + i].floatValue()) {
-          return;
-        }
+    }
+    Assert.fail();
+  }
+
+  private void checkDouble(double[][] ranges, int rangeId, double value) {
+    Assert.assertTrue(ranges[rangeId][0] <= value);
+    Assert.assertTrue(value <= ranges[rangeId][1]);
+  }
+
+  private void checkDoubleMV(double[][] ranges, int rangeId, double[] values, int docId,
+      int numValuesPerMVEntry) {
+    for (int i = 0; i < numValuesPerMVEntry; i++) {
+      if (ranges[rangeId][0] <= values[docId * numValuesPerMVEntry + i]
+          && values[docId * numValuesPerMVEntry + i] <= ranges[rangeId][1]) {
+        return;
       }
     }
     Assert.fail();
   }
 
-  private void checkDouble(Number[] rangeStartArray, int rangeId, double value) {
-    Assert.assertTrue(rangeStartArray[rangeId].doubleValue() <= value);
-    if (rangeId != rangeStartArray.length - 1) {
-      Assert.assertTrue(value < rangeStartArray[rangeId + 1].doubleValue());
+
+  private static Object valuesArray(DataType dataType, int numValues) {
+    switch (dataType) {
+      case INT:
+        return new int[numValues];
+      case LONG:
+        return new long[numValues];
+      case FLOAT:
+        return new float[numValues];
+      case DOUBLE:
+        return new double[numValues];
+      default:
+        throw new IllegalArgumentException("unexpected type " + dataType);
     }
   }
 
-  private void checkDoubleMV(Number[] rangeStartArray, int rangeId, Number[] values, int docId,
-      int numValuesPerMVEntry) {
-    if (rangeId != rangeStartArray.length - 1) {
-      for (int i = 0; i < numValuesPerMVEntry; i++) {
-        if (rangeStartArray[rangeId].doubleValue() <= values[docId * numValuesPerMVEntry + i].doubleValue()
-            && values[docId * numValuesPerMVEntry + i].doubleValue() < rangeStartArray[rangeId + 1].doubleValue()) {
-          return;
+  private static Object splitIntoRanges(DataType dataType, Object values, int numValuesPerRange) {
+    switch (dataType) {
+      case INT: {
+        int[] ints = (int[]) values;
+        int[] sorted = Arrays.copyOf(ints, ints.length);
+        Arrays.sort(sorted);
+        int[][] split = new int[ints.length / numValuesPerRange + 1][2];
+        for (int i = 0; i < split.length - 1; ++i) {
+          split[i][0] = sorted[i * numValuesPerRange];
+          split[i][1] = sorted[(i + 1) * numValuesPerRange - 1];
+        }
+        split[split.length - 1][0] = sorted[(split.length - 1) * numValuesPerRange];
+        split[split.length - 1][1] = sorted[sorted.length - 1];
+        return split;
+      }
+      case LONG: {
+        long[] longs = (long[]) values;
+        long[] sorted = Arrays.copyOf(longs, longs.length);
+        Arrays.sort(sorted);
+        long[][] split = new long[longs.length / numValuesPerRange + 1][2];
+        for (int i = 0; i < split.length - 1; ++i) {
+          split[i][0] = sorted[i * numValuesPerRange];
+          split[i][1] = sorted[(i + 1) * numValuesPerRange - 1];
         }
+        split[split.length - 1][0] = sorted[(split.length - 1) * numValuesPerRange];
+        split[split.length - 1][1] = sorted[sorted.length - 1];
+        return split;
       }
-    } else {
-      for (int i = 0; i < numValuesPerMVEntry; i++) {
-        if (rangeStartArray[rangeId].doubleValue() <= values[docId * numValuesPerMVEntry + i].doubleValue()) {
-          return;
+      case FLOAT: {
+        float[] floats = (float[]) values;
+        float[] sorted = Arrays.copyOf(floats, floats.length);
+        Arrays.sort(sorted);
+        float[][] split = new float[floats.length / numValuesPerRange + 1][2];
+        for (int i = 0; i < split.length - 1; ++i) {
+          split[i][0] = sorted[i * numValuesPerRange];
+          split[i][1] = sorted[(i + 1) * numValuesPerRange - 1];
         }
+        split[split.length - 1][0] = sorted[(split.length - 1) * numValuesPerRange];
+        split[split.length - 1][1] = sorted[sorted.length - 1];
+        return split;
       }
+      case DOUBLE: {
+        double[] doubles = (double[]) values;
+        double[] sorted = Arrays.copyOf(doubles, doubles.length);
+        Arrays.sort(sorted);
+        double[][] split = new double[doubles.length / numValuesPerRange + 1][2];
+        for (int i = 0; i < split.length - 1; ++i) {
+          split[i][0] = sorted[i * numValuesPerRange];
+          split[i][1] = sorted[(i + 1) * numValuesPerRange - 1];
+        }
+        split[split.length - 1][0] = sorted[(split.length - 1) * numValuesPerRange];
+        split[split.length - 1][1] = sorted[sorted.length - 1];
+        return split;
+      }
+      default:
+        throw new IllegalArgumentException("unexpected type " + dataType);
     }
-    Assert.fail();
   }
 }
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/datasource/DataSource.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/datasource/DataSource.java
index 013e749..e260de2 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/datasource/DataSource.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/datasource/DataSource.java
@@ -26,6 +26,7 @@ import org.apache.pinot.segment.spi.index.reader.H3IndexReader;
 import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
 import org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
 import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
+import org.apache.pinot.segment.spi.index.reader.RangeIndexReader;
 import org.apache.pinot.segment.spi.index.reader.TextIndexReader;
 
 
@@ -61,7 +62,7 @@ public interface DataSource {
    * <p>TODO: Have a separate interface for range index.
    */
   @Nullable
-  InvertedIndexReader<?> getRangeIndex();
+  RangeIndexReader<?> getRangeIndex();
 
   /**
    * Returns the text index for the column if exists, or {@code null} if not.
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/column/ColumnIndexContainer.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/column/ColumnIndexContainer.java
index 71b83bf..6fdc28e 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/column/ColumnIndexContainer.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/column/ColumnIndexContainer.java
@@ -26,6 +26,7 @@ import org.apache.pinot.segment.spi.index.reader.H3IndexReader;
 import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader;
 import org.apache.pinot.segment.spi.index.reader.JsonIndexReader;
 import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
+import org.apache.pinot.segment.spi.index.reader.RangeIndexReader;
 import org.apache.pinot.segment.spi.index.reader.TextIndexReader;
 
 
@@ -47,7 +48,7 @@ public interface ColumnIndexContainer extends Closeable {
   /**
    * Returns the range index for the column, or {@code null} if it does not exist.
    */
-  InvertedIndexReader<?> getRangeIndex();
+  RangeIndexReader<?> getRangeIndex();
 
   /**
    * Returns the text index for the column, or {@code null} if it does not exist.
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/RangeIndexReader.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/RangeIndexReader.java
new file mode 100644
index 0000000..3aeeaf9
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/RangeIndexReader.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.index.reader;
+
+import java.io.Closeable;
+import javax.annotation.Nullable;
+
+/**
+ * Interface for indexed range queries
+ * @param <T>
+ */
+public interface RangeIndexReader<T> extends Closeable {
+  /**
+   * Returns doc ids with a value between min and max, both inclusive.
+   * Doc ids returned by this method must correspond to values which
+   * satisfy the query.
+   * @param min the inclusive lower bound.
+   * @param max the inclusive upper bound.
+   * @return the matching doc ids.
+   */
+  @Nullable
+  T getMatchingDocIds(int min, int max);
+
+  /**
+   * Returns doc ids with a value between min and max, both inclusive.
+   * Doc ids returned by this method must correspond to values which
+   * satisfy the query.
+   * @param min the inclusive lower bound.
+   * @param max the inclusive upper bound.
+   * @return the matching doc ids.
+   */
+  @Nullable
+  T getMatchingDocIds(long min, long max);
+
+  /**
+   * Returns doc ids with a value between min and max, both inclusive.
+   * Doc ids returned by this method must correspond to values which
+   * satisfy the query.
+   * @param min the inclusive lower bound.
+   * @param max the inclusive upper bound.
+   * @return the matching doc ids.
+   */
+  @Nullable
+  T getMatchingDocIds(float min, float max);
+
+  /**
+   * Returns doc ids with a value between min and max, both inclusive.
+   * Doc ids returned by this method must correspond to values which
+   * satisfy the query.
+   * @param min the inclusive lower bound.
+   * @param max the inclusive upper bound.
+   * @return the matching doc ids.
+   */
+  @Nullable
+  T getMatchingDocIds(double min, double max);
+
+  /**
+   * Returns doc ids with a value between min and max, both inclusive.
+   * Doc ids returned by this method may correspond to values which
+   * satisfy the query, and require post filtering. If the implementation
+   * supports exact matches, this method will return null.
+   * @param min the inclusive lower bound.
+   * @param max the inclusive upper bound.
+   * @return the matching doc ids.
+   */
+  @Nullable
+  T getPartiallyMatchingDocIds(int min, int max);
+
+  /**
+   * Returns doc ids with a value between min and max, both inclusive.
+   * Doc ids returned by this method may correspond to values which
+   * satisfy the query, and require post filtering. If the implementation
+   * supports exact matches, this method will return null.
+   * @param min the inclusive lower bound.
+   * @param max the inclusive upper bound.
+   * @return the matching doc ids.
+   */
+  @Nullable
+  T getPartiallyMatchingDocIds(long min, long max);
+
+  /**
+   * Returns doc ids with a value between min and max, both inclusive.
+   * Doc ids returned by this method may correspond to values which
+   * satisfy the query, and require post filtering. If the implementation
+   * supports exact matches, this method will return null.
+   * @param min the inclusive lower bound.
+   * @param max the inclusive upper bound.
+   * @return the matching doc ids.
+   */
+  @Nullable
+  T getPartiallyMatchingDocIds(float min, float max);
+
+  /**
+   * Returns doc ids with a value between min and max, both inclusive.
+   * Doc ids returned by this method may correspond to values which
+   * satisfy the query, and require post filtering. If the implementation
+   * supports exact matches, this method will return null.
+   * @param min the inclusive lower bound.
+   * @param max the inclusive upper bound.
+   * @return the matching doc ids.
+   */
+  @Nullable
+  T getPartiallyMatchingDocIds(double min, double max);
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org