You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/08/29 07:41:37 UTC

[incubator-pinot] branch master updated: Add Range Indexing support for raw values (#5853)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a892fb4  Add Range Indexing support for raw values (#5853)
a892fb4 is described below

commit a892fb40bbc131017629f9149c352c25c5388598
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Sat Aug 29 13:11:28 2020 +0530

    Add Range Indexing support for raw values (#5853)
    
    - Create new interface RawValueBasedRangeIndexCreator
    - Add support for raw value range index
    
    Following refactoring has been done in this PR:
    - Create a marker interface InvertedIndexCreator
    - Rename the existing InvertedIndexCreator interface to DictionaryBasedInvertedIndexCreator
    - Create a new interface RawValueBasedInvertedIndexCreator which has add function for all data types instead of just int
    - Create a new RawValueBasedRangeIndexCreator which extends RawValueBasedInvertedIndexCreator
---
 .../filter/RangeIndexBasedFilterOperator.java      |  52 ++-
 .../predicate/RangePredicateEvaluatorFactory.java  |  40 +-
 ...va => DictionaryBasedInvertedIndexCreator.java} |  24 +-
 .../core/segment/creator/InvertedIndexCreator.java |  52 +--
 .../creator/RawValueBasedInvertedIndexCreator.java |  62 ++++
 .../creator/impl/SegmentColumnarIndexCreator.java  |  21 +-
 .../inv/OffHeapBitmapInvertedIndexCreator.java     |   8 +-
 .../impl/inv/OnHeapBitmapInvertedIndexCreator.java |   6 +-
 .../creator/impl/inv/RangeIndexCreator.java        | 153 +++++---
 .../impl/inv/text/LuceneTextIndexCreator.java      |  35 +-
 .../loader/invertedindex/RangeIndexHandler.java    | 129 +++++--
 .../index/creator/RangeIndexCreatorTest.java       | 402 ++++++++++++++++-----
 12 files changed, 697 insertions(+), 287 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RangeIndexBasedFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RangeIndexBasedFilterOperator.java
index 0297ec0..d9eee0b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RangeIndexBasedFilterOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RangeIndexBasedFilterOperator.java
@@ -23,6 +23,11 @@ import org.apache.pinot.core.operator.blocks.FilterBlock;
 import org.apache.pinot.core.operator.dociditerators.ScanBasedDocIdIterator;
 import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet;
 import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet;
+import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator;
+import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFactory.DoubleRawValueBasedRangePredicateEvaluator;
+import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFactory.FloatRawValueBasedRangePredicateEvaluator;
+import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFactory.IntRawValueBasedRangePredicateEvaluator;
+import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFactory.LongRawValueBasedRangePredicateEvaluator;
 import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFactory.OfflineDictionaryBasedRangePredicateEvaluator;
 import org.apache.pinot.core.segment.index.readers.RangeIndexReader;
 import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
@@ -34,12 +39,11 @@ public class RangeIndexBasedFilterOperator extends BaseFilterOperator {
 
   // NOTE: Range index can only apply to dictionary-encoded columns for now
   // TODO: Support raw index columns
-  private final OfflineDictionaryBasedRangePredicateEvaluator _rangePredicateEvaluator;
+  private final PredicateEvaluator _rangePredicateEvaluator;
   private final DataSource _dataSource;
   private final int _numDocs;
 
-  public RangeIndexBasedFilterOperator(OfflineDictionaryBasedRangePredicateEvaluator rangePredicateEvaluator,
-      DataSource dataSource, int numDocs) {
+  public RangeIndexBasedFilterOperator(PredicateEvaluator rangePredicateEvaluator, DataSource dataSource, int numDocs) {
     _rangePredicateEvaluator = rangePredicateEvaluator;
     _dataSource = dataSource;
     _numDocs = numDocs;
@@ -49,9 +53,45 @@ public class RangeIndexBasedFilterOperator extends BaseFilterOperator {
   protected FilterBlock getNextBlock() {
     RangeIndexReader rangeIndexReader = (RangeIndexReader) _dataSource.getRangeIndex();
     assert rangeIndexReader != null;
-    int firstRangeId = rangeIndexReader.findRangeId(_rangePredicateEvaluator.getStartDictId());
-    // NOTE: End dictionary id is exclusive in OfflineDictionaryBasedRangePredicateEvaluator.
-    int lastRangeId = rangeIndexReader.findRangeId(_rangePredicateEvaluator.getEndDictId() - 1);
+
+    int firstRangeId;
+    int lastRangeId;
+    if (_rangePredicateEvaluator instanceof OfflineDictionaryBasedRangePredicateEvaluator) {
+      firstRangeId = rangeIndexReader
+          .findRangeId(((OfflineDictionaryBasedRangePredicateEvaluator) _rangePredicateEvaluator).getStartDictId());
+      // NOTE: End dictionary id is exclusive in OfflineDictionaryBasedRangePredicateEvaluator.
+      lastRangeId = rangeIndexReader
+          .findRangeId(((OfflineDictionaryBasedRangePredicateEvaluator) _rangePredicateEvaluator).getEndDictId() - 1);
+    } else {
+      switch (_rangePredicateEvaluator.getDataType()) {
+        case INT:
+          firstRangeId = rangeIndexReader
+              .findRangeId(((IntRawValueBasedRangePredicateEvaluator) _rangePredicateEvaluator).geLowerBound());
+          lastRangeId = rangeIndexReader
+              .findRangeId(((IntRawValueBasedRangePredicateEvaluator) _rangePredicateEvaluator).getUpperBound());
+          break;
+        case LONG:
+          firstRangeId = rangeIndexReader
+              .findRangeId(((LongRawValueBasedRangePredicateEvaluator) _rangePredicateEvaluator).geLowerBound());
+          lastRangeId = rangeIndexReader
+              .findRangeId(((LongRawValueBasedRangePredicateEvaluator) _rangePredicateEvaluator).getUpperBound());
+          break;
+        case FLOAT:
+          firstRangeId = rangeIndexReader
+              .findRangeId(((FloatRawValueBasedRangePredicateEvaluator) _rangePredicateEvaluator).geLowerBound());
+          lastRangeId = rangeIndexReader
+              .findRangeId(((FloatRawValueBasedRangePredicateEvaluator) _rangePredicateEvaluator).getUpperBound());
+          break;
+        case DOUBLE:
+          firstRangeId = rangeIndexReader
+              .findRangeId(((DoubleRawValueBasedRangePredicateEvaluator) _rangePredicateEvaluator).geLowerBound());
+          lastRangeId = rangeIndexReader
+              .findRangeId(((DoubleRawValueBasedRangePredicateEvaluator) _rangePredicateEvaluator).getUpperBound());
+          break;
+        default:
+          throw new IllegalStateException("String and Bytes data type not supported for Range Indexing");
+      }
+    }
 
     // Need to scan the first and last range as they might be partially matched
     // TODO: Detect fully matched first and last range
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/RangePredicateEvaluatorFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/RangePredicateEvaluatorFactory.java
index 983c8e9..6ec5aad 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/RangePredicateEvaluatorFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/predicate/RangePredicateEvaluatorFactory.java
@@ -262,7 +262,7 @@ public class RangePredicateEvaluatorFactory {
     }
   }
 
-  private static final class IntRawValueBasedRangePredicateEvaluator extends BaseRawValueBasedPredicateEvaluator {
+  public static final class IntRawValueBasedRangePredicateEvaluator extends BaseRawValueBasedPredicateEvaluator {
     final int _lowerBound;
     final int _upperBound;
     final boolean _lowerInclusive;
@@ -280,6 +280,14 @@ public class RangePredicateEvaluatorFactory {
       _upperInclusive = upperUnbounded || rangePredicate.isUpperInclusive();
     }
 
+    public int geLowerBound() {
+      return _lowerBound;
+    }
+
+    public int getUpperBound() {
+      return _upperBound;
+    }
+
     @Override
     public Predicate.Type getPredicateType() {
       return Predicate.Type.RANGE;
@@ -307,7 +315,7 @@ public class RangePredicateEvaluatorFactory {
     }
   }
 
-  private static final class LongRawValueBasedRangePredicateEvaluator extends BaseRawValueBasedPredicateEvaluator {
+  public static final class LongRawValueBasedRangePredicateEvaluator extends BaseRawValueBasedPredicateEvaluator {
     final long _lowerBound;
     final long _upperBound;
     final boolean _lowerInclusive;
@@ -325,6 +333,14 @@ public class RangePredicateEvaluatorFactory {
       _upperInclusive = upperUnbounded || rangePredicate.isUpperInclusive();
     }
 
+    public long geLowerBound() {
+      return _lowerBound;
+    }
+
+    public long getUpperBound() {
+      return _upperBound;
+    }
+
     @Override
     public Predicate.Type getPredicateType() {
       return Predicate.Type.RANGE;
@@ -352,7 +368,7 @@ public class RangePredicateEvaluatorFactory {
     }
   }
 
-  private static final class FloatRawValueBasedRangePredicateEvaluator extends BaseRawValueBasedPredicateEvaluator {
+  public static final class FloatRawValueBasedRangePredicateEvaluator extends BaseRawValueBasedPredicateEvaluator {
     final float _lowerBound;
     final float _upperBound;
     final boolean _lowerInclusive;
@@ -370,6 +386,14 @@ public class RangePredicateEvaluatorFactory {
       _upperInclusive = upperUnbounded || rangePredicate.isUpperInclusive();
     }
 
+    public float geLowerBound() {
+      return _lowerBound;
+    }
+
+    public float getUpperBound() {
+      return _upperBound;
+    }
+
     @Override
     public Predicate.Type getPredicateType() {
       return Predicate.Type.RANGE;
@@ -397,7 +421,7 @@ public class RangePredicateEvaluatorFactory {
     }
   }
 
-  private static final class DoubleRawValueBasedRangePredicateEvaluator extends BaseRawValueBasedPredicateEvaluator {
+  public static final class DoubleRawValueBasedRangePredicateEvaluator extends BaseRawValueBasedPredicateEvaluator {
     final double _lowerBound;
     final double _upperBound;
     final boolean _lowerInclusive;
@@ -415,6 +439,14 @@ public class RangePredicateEvaluatorFactory {
       _upperInclusive = upperUnbounded || rangePredicate.isUpperInclusive();
     }
 
+    public double geLowerBound() {
+      return _lowerBound;
+    }
+
+    public double getUpperBound() {
+      return _upperBound;
+    }
+
     @Override
     public Predicate.Type getPredicateType() {
       return Predicate.Type.RANGE;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/InvertedIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/DictionaryBasedInvertedIndexCreator.java
similarity index 82%
copy from pinot-core/src/main/java/org/apache/pinot/core/segment/creator/InvertedIndexCreator.java
copy to pinot-core/src/main/java/org/apache/pinot/core/segment/creator/DictionaryBasedInvertedIndexCreator.java
index 5445bea..ff039cd 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/InvertedIndexCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/DictionaryBasedInvertedIndexCreator.java
@@ -18,10 +18,6 @@
  */
 package org.apache.pinot.core.segment.creator;
 
-import java.io.Closeable;
-import java.io.IOException;
-
-
 /**
  * Support for RoaringBitmap inverted index:
  * <pre>
@@ -42,7 +38,7 @@ import java.io.IOException;
  *
  * <p>To create an inverted index:
  * <ul>
- *   <li>
+ *   <li>R
  *     Construct an instance of <code>InvertedIndexCreator</code>
  *   </li>
  *   <li>
@@ -56,30 +52,20 @@ import java.io.IOException;
  *
  * Support for Lucene based inverted index for text
  */
-public interface InvertedIndexCreator extends Closeable {
+public interface DictionaryBasedInvertedIndexCreator extends InvertedIndexCreator {
 
   /**
-   * For single-valued column, adds the dictionary Id for the next document.
+   * For single-value column, adds the dictionary id for the next document.
    */
   void add(int dictId);
 
   /**
-   * For multi-valued column, adds the dictionary Ids for the next document.
+   * For multi-value column, adds the dictionary ids for the next document.
    */
   void add(int[] dictIds, int length);
 
   /**
-   * Seals the index and flushes it to disk.
-   */
-  void seal()
-      throws IOException;
-
-  /**
-   * Add a row (represented by an object) with a given docId
-   * @param document document/object to add
-   * @param docId object's docId
-   *
-   * Currently this is
+   * For text column, adds the document of the given document id.
    */
   void addDoc(Object document, int docId);
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/InvertedIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/InvertedIndexCreator.java
index 5445bea..348e47d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/InvertedIndexCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/InvertedIndexCreator.java
@@ -23,63 +23,13 @@ import java.io.IOException;
 
 
 /**
- * Support for RoaringBitmap inverted index:
- * <pre>
- * Layout for RoaringBitmap inverted index:
- * |-------------------------------------------------------------------------|
- * |                    Start offset of 1st bitmap                           |
- * |    End offset of 1st bitmap (exclusive) / Start offset of 2nd bitmap    |
- * |                                   ...                                   |
- * | End offset of 2nd last bitmap (exclusive) / Start offset of last bitmap |
- * |                  End offset of last bitmap (exclusive)                  |
- * |-------------------------------------------------------------------------|
- * |                           Data for 1st bitmap                           |
- * |                           Data for 2nd bitmap                           |
- * |                                   ...                                   |
- * |                           Data for last bitmap                          |
- * |-------------------------------------------------------------------------|
- * </pre>
- *
- * <p>To create an inverted index:
- * <ul>
- *   <li>
- *     Construct an instance of <code>InvertedIndexCreator</code>
- *   </li>
- *   <li>
- *     Call add() for each docId in sequence starting with 0 to add dictId (dictIds for multi-valued column) into the
- *     creator
- *   </li>
- *   <li>
- *     Call seal() after all dictIds have been added
- *   </li>
- * </ul>
- *
- * Support for Lucene based inverted index for text
+ * Marker interface for all inverted index creators.
  */
 public interface InvertedIndexCreator extends Closeable {
 
   /**
-   * For single-valued column, adds the dictionary Id for the next document.
-   */
-  void add(int dictId);
-
-  /**
-   * For multi-valued column, adds the dictionary Ids for the next document.
-   */
-  void add(int[] dictIds, int length);
-
-  /**
    * Seals the index and flushes it to disk.
    */
   void seal()
       throws IOException;
-
-  /**
-   * Add a row (represented by an object) with a given docId
-   * @param document document/object to add
-   * @param docId object's docId
-   *
-   * Currently this is
-   */
-  void addDoc(Object document, int docId);
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/RawValueBasedInvertedIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/RawValueBasedInvertedIndexCreator.java
new file mode 100644
index 0000000..8e0accd
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/RawValueBasedInvertedIndexCreator.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.creator;
+
+public interface RawValueBasedInvertedIndexCreator extends InvertedIndexCreator {
+
+  /**
+   * For single-value column, adds the int value for the next document.
+   */
+  void add(int value);
+
+  /**
+   * For multi-value column, adds the int values for the next document.
+   */
+  void add(int[] values, int length);
+
+  /**
+   * For single-value column, adds the long value for the next document.
+   */
+  void add(long value);
+
+  /**
+   * For multi-value column, adds the long values for the next document.
+   */
+  void add(long[] values, int length);
+
+  /**
+   * For single-value column, adds the float value for the next document.
+   */
+  void add(float value);
+
+  /**
+   * For multi-value column, adds the float values for the next document.
+   */
+  void add(float[] values, int length);
+
+  /**
+   * For single-value column, adds the double value for the next document.
+   */
+  void add(double value);
+
+  /**
+   * For multi-value column, adds the double values for the next document.
+   */
+  void add(double[] values, int length);
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
index 233391b..4489dc8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -38,8 +38,8 @@ import org.apache.pinot.core.io.compression.ChunkCompressorFactory;
 import org.apache.pinot.core.io.util.PinotDataBitSet;
 import org.apache.pinot.core.io.writer.impl.BaseChunkSVForwardIndexWriter;
 import org.apache.pinot.core.segment.creator.ColumnIndexCreationInfo;
+import org.apache.pinot.core.segment.creator.DictionaryBasedInvertedIndexCreator;
 import org.apache.pinot.core.segment.creator.ForwardIndexCreator;
-import org.apache.pinot.core.segment.creator.InvertedIndexCreator;
 import org.apache.pinot.core.segment.creator.SegmentCreator;
 import org.apache.pinot.core.segment.creator.SegmentIndexCreationInfo;
 import org.apache.pinot.core.segment.creator.TextIndexType;
@@ -82,8 +82,8 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
   private Map<String, ColumnIndexCreationInfo> indexCreationInfoMap;
   private Map<String, SegmentDictionaryCreator> _dictionaryCreatorMap = new HashMap<>();
   private Map<String, ForwardIndexCreator> _forwardIndexCreatorMap = new HashMap<>();
-  private Map<String, InvertedIndexCreator> _invertedIndexCreatorMap = new HashMap<>();
-  private Map<String, InvertedIndexCreator> _textIndexCreatorMap = new HashMap<>();
+  private Map<String, DictionaryBasedInvertedIndexCreator> _invertedIndexCreatorMap = new HashMap<>();
+  private Map<String, DictionaryBasedInvertedIndexCreator> _textIndexCreatorMap = new HashMap<>();
   private Map<String, NullValueVectorCreator> _nullValueVectorCreatorMap = new HashMap<>();
   private String segmentName;
   private Schema schema;
@@ -324,7 +324,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
           int dictId = dictionaryCreator.indexOfSV(columnValueToIndex);
           // store the docID -> dictID mapping in forward index
           forwardIndexCreator.putDictId(dictId);
-          InvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap.get(columnName);
+          DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap.get(columnName);
           if (invertedIndexCreator != null) {
             // if inverted index enabled during segment creation,
             // then store dictID -> docID mapping in inverted index
@@ -358,15 +358,15 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
         }
         // text-index enabled SV column
         if (_textIndexColumns.contains(columnName)) {
-          InvertedIndexCreator textInvertedIndexCreator = _textIndexCreatorMap.get(columnName);
+          DictionaryBasedInvertedIndexCreator textIndexCreator = _textIndexCreatorMap.get(columnName);
           // add the column value to lucene index
-          textInvertedIndexCreator.addDoc(columnValueToIndex, docIdCounter);
+          textIndexCreator.addDoc(columnValueToIndex, docIdCounter);
         }
       } else {
         // MV column (always dictionary encoded)
         int[] dictIds = dictionaryCreator.indexOfMV(columnValueToIndex);
         forwardIndexCreator.putDictIdMV(dictIds);
-        InvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap.get(columnName);
+        DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap.get(columnName);
         if (invertedIndexCreator != null) {
           invertedIndexCreator.add(dictIds, dictIds.length);
         }
@@ -390,9 +390,12 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
   @Override
   public void seal()
       throws ConfigurationException, IOException {
-    for (InvertedIndexCreator invertedIndexCreator : _invertedIndexCreatorMap.values()) {
+    for (DictionaryBasedInvertedIndexCreator invertedIndexCreator : _invertedIndexCreatorMap.values()) {
       invertedIndexCreator.seal();
     }
+    for (DictionaryBasedInvertedIndexCreator textIndexCreator : _textIndexCreatorMap.values()) {
+      textIndexCreator.seal();
+    }
     for (NullValueVectorCreator nullValueVectorCreator : _nullValueVectorCreatorMap.values()) {
       nullValueVectorCreator.seal();
     }
@@ -617,6 +620,6 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
       throws IOException {
     FileUtils.close(Iterables
         .concat(_dictionaryCreatorMap.values(), _forwardIndexCreatorMap.values(), _invertedIndexCreatorMap.values(),
-            _nullValueVectorCreatorMap.values(), _textIndexCreatorMap.values()));
+            _textIndexCreatorMap.values(), _nullValueVectorCreatorMap.values()));
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
index 1bc994b..7ef6151 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java
@@ -26,15 +26,15 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.core.segment.creator.InvertedIndexCreator;
+import org.apache.pinot.core.segment.creator.DictionaryBasedInvertedIndexCreator;
 import org.apache.pinot.core.segment.creator.impl.V1Constants;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
 
 /**
- * Implementation of {@link InvertedIndexCreator} that uses off-heap memory.
+ * Implementation of {@link DictionaryBasedInvertedIndexCreator} that uses off-heap memory.
  * <p>We use 2 passes to create the inverted index.
  * <ul>
  *   <li>
@@ -52,7 +52,7 @@ import org.roaringbitmap.buffer.MutableRoaringBitmap;
  * </ul>
  * <p>Based on the number of values we need to store, we use direct memory or MMap file to allocate the buffer.
  */
-public final class OffHeapBitmapInvertedIndexCreator implements InvertedIndexCreator {
+public final class OffHeapBitmapInvertedIndexCreator implements DictionaryBasedInvertedIndexCreator {
   // Use MMapBuffer if the value buffer size is larger than 2G
   private static final int NUM_VALUES_THRESHOLD_FOR_MMAP_BUFFER = 500_000_000;
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java
index 31d7f7a..a1b322f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java
@@ -25,15 +25,15 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.core.segment.creator.InvertedIndexCreator;
+import org.apache.pinot.core.segment.creator.DictionaryBasedInvertedIndexCreator;
 import org.apache.pinot.core.segment.creator.impl.V1Constants;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
 
 /**
- * Implementation of {@link InvertedIndexCreator} that uses on-heap memory.
+ * Implementation of {@link DictionaryBasedInvertedIndexCreator} that uses on-heap memory.
  */
-public final class OnHeapBitmapInvertedIndexCreator implements InvertedIndexCreator {
+public final class OnHeapBitmapInvertedIndexCreator implements DictionaryBasedInvertedIndexCreator {
   private final File _invertedIndexFile;
   private final MutableRoaringBitmap[] _bitmaps;
   private int _nextDocId;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/RangeIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/RangeIndexCreator.java
index 8e1934d..84f0619 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/RangeIndexCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/RangeIndexCreator.java
@@ -33,7 +33,8 @@ import java.util.ArrayList;
 import java.util.List;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.core.query.utils.Pair;
-import org.apache.pinot.core.segment.creator.InvertedIndexCreator;
+import org.apache.pinot.core.segment.creator.DictionaryBasedInvertedIndexCreator;
+import org.apache.pinot.core.segment.creator.RawValueBasedInvertedIndexCreator;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
@@ -45,28 +46,22 @@ import static org.apache.pinot.core.segment.creator.impl.V1Constants.Indexes.BIT
 
 
 /**
- * Implementation of {@link InvertedIndexCreator} that uses off-heap memory.
+ * Range index creator that uses off-heap memory.
  * <p>We use 2 passes to create the range index.
  * <ul>
- *
- *   <li>
- *     A
- *   </li>
  *   <li>
- *     In the first pass (adding values phase), when add() method is called, store the raw values into the
- *     value buffer (for multi-valued column we flatten the values).
- *     We also store the corresponding docId in docIdBuffer which will be sorted in the next phase based on the value in valueBuffer.
- *
+ *     In the first pass (adding values phase), when add() method is called, store the raw values into the value buffer
+ *     (for multi-valued column we flatten the values). We also store the corresponding docId in docIdBuffer which will
+ *     be sorted in the next phase based on the value in valueBuffer.
  *   </li>
  *   <li>
- *     In the second pass (processing values phase), when seal() method is called, we sort the docIdBuffer based on the value in valueBuffer.
- *     We then iterate over the sorted docIdBuffer and create ranges such that each range comprises of _numDocsPerRange.
- *     While
+ *     In the second pass (processing values phase), when seal() method is called, we sort the docIdBuffer based on the
+ *     value in valueBuffer. We then iterate over the sorted docIdBuffer and create ranges such that each range
+ *     comprises of _numDocsPerRange.
  *   </li>
  * </ul>
  */
-public final class RangeIndexCreator implements InvertedIndexCreator {
-
+public final class RangeIndexCreator implements DictionaryBasedInvertedIndexCreator, RawValueBasedInvertedIndexCreator {
   private static final Logger LOGGER = LoggerFactory.getLogger(RangeIndexCreator.class);
 
   //This will dump the content of temp buffers and ranges
@@ -165,21 +160,71 @@ public final class RangeIndexCreator implements InvertedIndexCreator {
   }
 
   @Override
-  public void add(int dictId) {
-    _numberValueBuffer.put(_nextDocId, dictId);
+  public void add(int value) {
+    _numberValueBuffer.put(_nextDocId, value);
+    _docIdBuffer.put(_nextDocId, _nextDocId);
+    _nextDocId++;
+  }
+
+  @Override
+  public void add(int[] values, int length) {
+    for (int i = 0; i < length; i++) {
+      _numberValueBuffer.put(_nextValueId, values[i]);
+      _docIdBuffer.put(_nextValueId, _nextDocId);
+      _nextValueId++;
+    }
+    _nextDocId++;
+  }
+
+  @Override
+  public void add(long value) {
+    _numberValueBuffer.put(_nextDocId, value);
+    _docIdBuffer.put(_nextDocId, _nextDocId);
+    _nextDocId++;
+  }
+
+  @Override
+  public void add(long[] values, int length) {
+    for (int i = 0; i < length; i++) {
+      _numberValueBuffer.put(_nextValueId, values[i]);
+      _docIdBuffer.put(_nextValueId, _nextDocId);
+      _nextValueId++;
+    }
+    _nextDocId++;
+  }
+
+  @Override
+  public void add(float value) {
+    _numberValueBuffer.put(_nextDocId, value);
     _docIdBuffer.put(_nextDocId, _nextDocId);
-    _nextDocId = _nextDocId + 1;
+    _nextDocId++;
   }
 
   @Override
-  public void add(int[] dictIds, int length) {
+  public void add(float[] values, int length) {
     for (int i = 0; i < length; i++) {
-      int dictId = dictIds[i];
-      _numberValueBuffer.put(_nextValueId, dictId);
+      _numberValueBuffer.put(_nextValueId, values[i]);
       _docIdBuffer.put(_nextValueId, _nextDocId);
-      _nextValueId = _nextValueId + 1;
+      _nextValueId++;
     }
-    _nextDocId = _nextDocId + 1;
+    _nextDocId++;
+  }
+
+  @Override
+  public void add(double value) {
+    _numberValueBuffer.put(_nextDocId, value);
+    _docIdBuffer.put(_nextDocId, _nextDocId);
+    _nextDocId++;
+  }
+
+  @Override
+  public void add(double[] values, int length) {
+    for (int i = 0; i < length; i++) {
+      _numberValueBuffer.put(_nextValueId, values[i]);
+      _docIdBuffer.put(_nextValueId, _nextDocId);
+      _nextValueId++;
+    }
+    _nextDocId++;
   }
 
   @Override
@@ -431,7 +476,7 @@ public final class RangeIndexCreator implements InvertedIndexCreator {
     }
   }
 
-  interface NumberValueBuffer {
+  private interface NumberValueBuffer {
 
     void put(int position, Number value);
 
@@ -440,23 +485,21 @@ public final class RangeIndexCreator implements InvertedIndexCreator {
     int compare(Number val1, Number val2);
   }
 
-  class IntValueBuffer implements NumberValueBuffer {
-
-    private PinotDataBuffer _buffer;
-
-    IntValueBuffer(PinotDataBuffer buffer) {
+  private static class IntValueBuffer implements NumberValueBuffer {
+    private final PinotDataBuffer _dataBuffer;
 
-      _buffer = buffer;
+    IntValueBuffer(PinotDataBuffer dataBuffer) {
+      _dataBuffer = dataBuffer;
     }
 
     @Override
     public void put(int position, Number value) {
-      _buffer.putInt(position << 2, value.intValue());
+      _dataBuffer.putInt(position << 2, value.intValue());
     }
 
     @Override
     public Number get(int position) {
-      return _buffer.getInt(position << 2);
+      return _dataBuffer.getInt(position << 2);
     }
 
     @Override
@@ -465,23 +508,21 @@ public final class RangeIndexCreator implements InvertedIndexCreator {
     }
   }
 
-  class LongValueBuffer implements NumberValueBuffer {
-
-    private PinotDataBuffer _buffer;
-
-    LongValueBuffer(PinotDataBuffer buffer) {
+  private static class LongValueBuffer implements NumberValueBuffer {
+    private final PinotDataBuffer _dataBuffer;
 
-      _buffer = buffer;
+    LongValueBuffer(PinotDataBuffer dataBuffer) {
+      _dataBuffer = dataBuffer;
     }
 
     @Override
     public void put(int position, Number value) {
-      _buffer.putInt(position << 3, value.intValue());
+      _dataBuffer.putLong(position << 3, value.longValue());
     }
 
     @Override
     public Number get(int position) {
-      return _buffer.getInt(position << 3);
+      return _dataBuffer.getLong(position << 3);
     }
 
     @Override
@@ -490,53 +531,49 @@ public final class RangeIndexCreator implements InvertedIndexCreator {
     }
   }
 
-  class FloatValueBuffer implements NumberValueBuffer {
-
-    private PinotDataBuffer _buffer;
-
-    FloatValueBuffer(PinotDataBuffer buffer) {
+  private static class FloatValueBuffer implements NumberValueBuffer {
+    private final PinotDataBuffer _dataBuffer;
 
-      _buffer = buffer;
+    FloatValueBuffer(PinotDataBuffer dataBuffer) {
+      _dataBuffer = dataBuffer;
     }
 
     @Override
     public void put(int position, Number value) {
-      _buffer.putInt(position << 2, value.intValue());
+      _dataBuffer.putFloat(position << 2, value.intValue());
     }
 
     @Override
     public Number get(int position) {
-      return _buffer.getInt(position << 2);
+      return _dataBuffer.getFloat(position << 2);
     }
 
     @Override
     public int compare(Number val1, Number val2) {
-      return Long.compare(val1.longValue(), val2.longValue());
+      return Float.compare(val1.floatValue(), val2.floatValue());
     }
   }
 
-  class DoubleValueBuffer implements NumberValueBuffer {
-
-    private PinotDataBuffer _buffer;
-
-    DoubleValueBuffer(PinotDataBuffer buffer) {
+  private static class DoubleValueBuffer implements NumberValueBuffer {
+    private final PinotDataBuffer _dataBuffer;
 
-      _buffer = buffer;
+    DoubleValueBuffer(PinotDataBuffer dataBuffer) {
+      _dataBuffer = dataBuffer;
     }
 
     @Override
     public void put(int position, Number value) {
-      _buffer.putInt(position << 3, value.intValue());
+      _dataBuffer.putDouble(position << 3, value.doubleValue());
     }
 
     @Override
     public Number get(int position) {
-      return _buffer.getInt(position << 3);
+      return _dataBuffer.getDouble(position << 3);
     }
 
     @Override
     public int compare(Number val1, Number val2) {
-      return Long.compare(val1.longValue(), val2.longValue());
+      return Double.compare(val1.doubleValue(), val2.doubleValue());
     }
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/text/LuceneTextIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/text/LuceneTextIndexCreator.java
index 4c19cc2..ba071df 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/text/LuceneTextIndexCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/text/LuceneTextIndexCreator.java
@@ -21,14 +21,8 @@ package org.apache.pinot.core.segment.creator.impl.inv.text;
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.List;
-import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.CharArraySet;
-import org.apache.lucene.analysis.core.LowerCaseFilterFactory;
-import org.apache.lucene.analysis.core.StopFilterFactory;
-import org.apache.lucene.analysis.custom.CustomAnalyzer;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
-import org.apache.lucene.analysis.standard.StandardTokenizerFactory;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field;
 import org.apache.lucene.document.StoredField;
@@ -37,7 +31,7 @@ import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.FSDirectory;
-import org.apache.pinot.core.segment.creator.InvertedIndexCreator;
+import org.apache.pinot.core.segment.creator.DictionaryBasedInvertedIndexCreator;
 import org.slf4j.LoggerFactory;
 
 
@@ -46,7 +40,7 @@ import org.slf4j.LoggerFactory;
  * Used for both offline from {@link org.apache.pinot.core.segment.creator.impl.SegmentColumnarIndexCreator}
  * and realtime from {@link org.apache.pinot.core.realtime.impl.invertedindex.RealtimeLuceneTextIndexReader}
  */
-public class LuceneTextIndexCreator implements InvertedIndexCreator {
+public class LuceneTextIndexCreator implements DictionaryBasedInvertedIndexCreator {
   private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(LuceneTextIndexCreator.class);
   // TODO: make buffer size configurable choosing a default value based on the heap usage results in design doc
   private static final int LUCENE_INDEX_MAX_BUFFER_SIZE_MB = 500;
@@ -58,14 +52,10 @@ public class LuceneTextIndexCreator implements InvertedIndexCreator {
   private final Directory _indexDirectory;
   private final IndexWriter _indexWriter;
 
-  public static final CharArraySet ENGLISH_STOP_WORDS_SET =
-      new CharArraySet(Arrays.asList(
-          "a", "an", "and", "are", "as", "at", "be", "but", "by",
-          "for", "if", "in", "into", "is", "it",
-          "no", "not", "of", "on", "or", "such",
-          "that", "the", "their", "then", "than", "there", "these",
-          "they", "this", "to", "was", "will", "with", "those"
-      ), true);
+  public static final CharArraySet ENGLISH_STOP_WORDS_SET = new CharArraySet(Arrays
+      .asList("a", "an", "and", "are", "as", "at", "be", "but", "by", "for", "if", "in", "into", "is", "it", "no",
+          "not", "of", "on", "or", "such", "that", "the", "their", "then", "than", "there", "these", "they", "this",
+          "to", "was", "will", "with", "those"), true);
 
   /**
    * Called by {@link org.apache.pinot.core.segment.creator.impl.SegmentColumnarIndexCreator}
@@ -80,8 +70,8 @@ public class LuceneTextIndexCreator implements InvertedIndexCreator {
    *               Once {@link org.apache.pinot.core.segment.creator.impl.SegmentColumnarIndexCreator}
    *               finishes indexing all documents/rows for the segment, we need to commit and close
    *               the Lucene index which will internally persist the index on disk, do the necessary
-   *               resource cleanup etc. We commit during {@link InvertedIndexCreator#seal()}
-   *               and close during {@link InvertedIndexCreator#close()}.
+   *               resource cleanup etc. We commit during {@link DictionaryBasedInvertedIndexCreator#seal()}
+   *               and close during {@link DictionaryBasedInvertedIndexCreator#close()}.
    *               This lucene index writer is used by both offline and realtime (both during
    *               indexing in-memory MutableSegment and later during conversion to offline).
    *               Since realtime segment conversion is again going to go through the offline
@@ -104,7 +94,8 @@ public class LuceneTextIndexCreator implements InvertedIndexCreator {
       indexWriterConfig.setCommitOnClose(commit);
       _indexWriter = new IndexWriter(_indexDirectory, indexWriterConfig);
     } catch (Exception e) {
-      LOGGER.error("Failed to instantiate Lucene text index creator for column {}, exception {}", column, e.getMessage());
+      LOGGER
+          .error("Failed to instantiate Lucene text index creator for column {}, exception {}", column, e.getMessage());
       throw new RuntimeException(e);
     }
   }
@@ -122,7 +113,8 @@ public class LuceneTextIndexCreator implements InvertedIndexCreator {
     try {
       _indexWriter.addDocument(docToIndex);
     } catch (Exception e) {
-      LOGGER.error("Failure while adding a new document to index for column {}, exception {}", _textColumn, e.getMessage());
+      LOGGER.error("Failure while adding a new document to index for column {}, exception {}", _textColumn,
+          e.getMessage());
       throw new RuntimeException(e);
     }
   }
@@ -153,7 +145,8 @@ public class LuceneTextIndexCreator implements InvertedIndexCreator {
   }
 
   @Override
-  public void close() throws IOException {
+  public void close()
+      throws IOException {
     try {
       // based on the commit flag set in IndexWriterConfig, this will decide to commit or not
       _indexWriter.close();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/RangeIndexHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/RangeIndexHandler.java
index 4f1bce0..fedc6e5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/RangeIndexHandler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/invertedindex/RangeIndexHandler.java
@@ -34,6 +34,7 @@ import org.apache.pinot.core.segment.index.readers.ForwardIndexReader;
 import org.apache.pinot.core.segment.index.readers.ForwardIndexReaderContext;
 import org.apache.pinot.core.segment.index.readers.forward.FixedBitMVForwardIndexReader;
 import org.apache.pinot.core.segment.index.readers.forward.FixedBitSVForwardIndexReader;
+import org.apache.pinot.core.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
 import org.apache.pinot.core.segment.store.ColumnIndexType;
 import org.apache.pinot.core.segment.store.SegmentDirectory;
@@ -62,7 +63,7 @@ public class RangeIndexHandler {
     // Only create range index on dictionary-encoded unsorted columns
     for (String column : indexLoadingConfig.getRangeIndexColumns()) {
       ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column);
-      if (columnMetadata != null && !columnMetadata.isSorted() && columnMetadata.hasDictionary()) {
+      if (columnMetadata != null && !columnMetadata.isSorted()) {
         _rangeIndexColumns.add(columnMetadata);
       }
     }
@@ -102,7 +103,11 @@ public class RangeIndexHandler {
 
     // Create new range index for the column.
     LOGGER.info("Creating new range index for segment: {}, column: {}", _segmentName, column);
-    handleDictionaryBasedColumn(columnMetadata);
+    if (columnMetadata.hasDictionary()) {
+      handleDictionaryBasedColumn(columnMetadata);
+    } else {
+      handleNonDictionaryBasedColumn(columnMetadata);
+    }
 
     // For v3, write the generated range index file into the single file and remove it.
     if (_segmentVersion == SegmentVersion.v3) {
@@ -118,25 +123,97 @@ public class RangeIndexHandler {
   private void handleDictionaryBasedColumn(ColumnMetadata columnMetadata)
       throws IOException {
     int numDocs = columnMetadata.getTotalDocs();
-    try (RangeIndexCreator creator = new RangeIndexCreator(_indexDir, columnMetadata.getFieldSpec(),
-        FieldSpec.DataType.INT, -1, -1, numDocs, columnMetadata.getTotalNumberOfEntries())) {
-      try (ForwardIndexReader forwardIndexReader = getForwardIndexReader(columnMetadata, _segmentWriter);
-          ForwardIndexReaderContext readerContext = forwardIndexReader.createContext()) {
-        if (columnMetadata.isSingleValue()) {
-          // Single-value column.
-          for (int i = 0; i < numDocs; i++) {
-            creator.add(forwardIndexReader.getDictId(i, readerContext));
-          }
-        } else {
-          // Multi-value column.
-          int[] dictIds = new int[columnMetadata.getMaxNumberOfMultiValues()];
-          for (int i = 0; i < numDocs; i++) {
-            int length = forwardIndexReader.getDictIdMV(i, dictIds, readerContext);
-            creator.add(dictIds, length);
-          }
+    try (ForwardIndexReader forwardIndexReader = getForwardIndexReader(columnMetadata, _segmentWriter);
+        ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
+        RangeIndexCreator rangeIndexCreator = new RangeIndexCreator(_indexDir, columnMetadata.getFieldSpec(),
+            FieldSpec.DataType.INT, -1, -1, numDocs, columnMetadata.getTotalNumberOfEntries())) {
+      if (columnMetadata.isSingleValue()) {
+        // Single-value column
+        for (int i = 0; i < numDocs; i++) {
+          rangeIndexCreator.add(forwardIndexReader.getDictId(i, readerContext));
+        }
+      } else {
+        // Multi-value column
+        int[] dictIds = new int[columnMetadata.getMaxNumberOfMultiValues()];
+        for (int i = 0; i < numDocs; i++) {
+          int length = forwardIndexReader.getDictIdMV(i, dictIds, readerContext);
+          rangeIndexCreator.add(dictIds, length);
         }
-        creator.seal();
       }
+      rangeIndexCreator.seal();
+    }
+  }
+
+  private void handleNonDictionaryBasedColumn(ColumnMetadata columnMetadata)
+      throws IOException {
+    int numDocs = columnMetadata.getTotalDocs();
+    try (ForwardIndexReader forwardIndexReader = getForwardIndexReader(columnMetadata, _segmentWriter);
+        ForwardIndexReaderContext readerContext = forwardIndexReader.createContext();
+        RangeIndexCreator rangeIndexCreator = new RangeIndexCreator(_indexDir, columnMetadata.getFieldSpec(),
+            columnMetadata.getDataType(), -1, -1, numDocs, columnMetadata.getTotalNumberOfEntries())) {
+      if (columnMetadata.isSingleValue()) {
+        // Single-value column.
+        switch (columnMetadata.getDataType()) {
+          case INT:
+            for (int i = 0; i < numDocs; i++) {
+              rangeIndexCreator.add(forwardIndexReader.getInt(i, readerContext));
+            }
+            break;
+          case LONG:
+            for (int i = 0; i < numDocs; i++) {
+              rangeIndexCreator.add(forwardIndexReader.getLong(i, readerContext));
+            }
+            break;
+          case FLOAT:
+            for (int i = 0; i < numDocs; i++) {
+              rangeIndexCreator.add(forwardIndexReader.getFloat(i, readerContext));
+            }
+            break;
+          case DOUBLE:
+            for (int i = 0; i < numDocs; i++) {
+              rangeIndexCreator.add(forwardIndexReader.getDouble(i, readerContext));
+            }
+            break;
+          default:
+            throw new IllegalStateException("Unsupported data type: " + columnMetadata.getDataType());
+        }
+      } else {
+        // Multi-value column
+        int maxNumValuesPerMVEntry = columnMetadata.getMaxNumberOfMultiValues();
+        switch (columnMetadata.getDataType()) {
+          case INT:
+            int[] intValues = new int[maxNumValuesPerMVEntry];
+            for (int i = 0; i < numDocs; i++) {
+              int length = forwardIndexReader.getIntMV(i, intValues, readerContext);
+              rangeIndexCreator.add(intValues, length);
+            }
+            break;
+          case LONG:
+            long[] longValues = new long[maxNumValuesPerMVEntry];
+            for (int i = 0; i < numDocs; i++) {
+              int length = forwardIndexReader.getLongMV(i, longValues, readerContext);
+              rangeIndexCreator.add(longValues, length);
+            }
+            break;
+          case FLOAT:
+            float[] floatValues = new float[maxNumValuesPerMVEntry];
+            for (int i = 0; i < numDocs; i++) {
+              int length = forwardIndexReader.getFloatMV(i, floatValues, readerContext);
+              rangeIndexCreator.add(floatValues, length);
+            }
+            break;
+          case DOUBLE:
+            double[] doubleValues = new double[maxNumValuesPerMVEntry];
+            for (int i = 0; i < numDocs; i++) {
+              int length = forwardIndexReader.getDoubleMV(i, doubleValues, readerContext);
+              rangeIndexCreator.add(doubleValues, length);
+            }
+            break;
+          default:
+            throw new IllegalStateException("Unsupported data type: " + columnMetadata.getDataType());
+        }
+      }
+      rangeIndexCreator.seal();
     }
   }
 
@@ -147,10 +224,18 @@ public class RangeIndexHandler {
     int numRows = columnMetadata.getTotalDocs();
     int numBitsPerValue = columnMetadata.getBitsPerElement();
     if (columnMetadata.isSingleValue()) {
-      return new FixedBitSVForwardIndexReader(buffer, numRows, numBitsPerValue);
+      if (columnMetadata.hasDictionary()) {
+        return new FixedBitSVForwardIndexReader(buffer, numRows, numBitsPerValue);
+      } else {
+        return new FixedByteChunkSVForwardIndexReader(buffer, columnMetadata.getDataType());
+      }
     } else {
-      return new FixedBitMVForwardIndexReader(buffer, numRows, columnMetadata.getTotalNumberOfEntries(),
-          numBitsPerValue);
+      if (columnMetadata.hasDictionary()) {
+        return new FixedBitMVForwardIndexReader(buffer, numRows, columnMetadata.getTotalNumberOfEntries(),
+            numBitsPerValue);
+      } else {
+        throw new IllegalStateException("Raw index on multi-value column is not supported");
+      }
     }
   }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/RangeIndexCreatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/RangeIndexCreatorTest.java
index 0a4c171..7590d33 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/RangeIndexCreatorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/RangeIndexCreatorTest.java
@@ -18,144 +18,366 @@
  */
 package org.apache.pinot.core.segment.index.creator;
 
-import com.google.common.base.Preconditions;
-import java.io.DataInputStream;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Random;
+import org.apache.commons.io.FileUtils;
 import org.apache.pinot.core.segment.creator.impl.inv.RangeIndexCreator;
 import org.apache.pinot.core.segment.index.readers.RangeIndexReader;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
-import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.apache.pinot.core.segment.creator.impl.V1Constants.Indexes.BITMAP_RANGE_INDEX_FILE_EXTENSION;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 
-/**
- * Class for testing Range index.
- */
 public class RangeIndexCreatorTest {
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "RangeIndexCreatorTest");
+  private static final Random RANDOM = new Random();
+  private static final String COLUMN_NAME = "testColumn";
+
+  @BeforeClass
+  public void setUp()
+      throws IOException {
+    FileUtils.forceMkdir(INDEX_DIR);
+  }
 
   @Test
   public void testInt()
       throws Exception {
-    File indexDir = new File(System.getProperty("java.io.tmpdir") + "/testRangeIndex");
-    indexDir.mkdirs();
-    FieldSpec fieldSpec = new MetricFieldSpec();
-    fieldSpec.setDataType(FieldSpec.DataType.INT);
-    String columnName = "latency";
-    fieldSpec.setName(columnName);
-    int cardinality = 20;
-    int numDocs = 1000;
-    int numValues = 1000;
-    RangeIndexCreator creator =
-        new RangeIndexCreator(indexDir, fieldSpec, FieldSpec.DataType.INT, -1, -1, numDocs, numValues);
-    Random r = new Random();
-    Number[] values = new Number[numValues];
-    for (int i = 0; i < numDocs; i++) {
-      int val = r.nextInt(cardinality);
-      creator.add(val);
-      values[i] = val;
-    }
-    creator.seal();
+    testDataType(DataType.INT);
+  }
 
-    File rangeIndexFile = new File(indexDir, columnName + BITMAP_RANGE_INDEX_FILE_EXTENSION);
-    //TEST THE BUFFER FORMAT
+  @Test
+  public void testLong()
+      throws Exception {
+    testDataType(DataType.LONG);
+  }
 
-    testRangeIndexBufferFormat(values, rangeIndexFile);
+  @Test
+  public void testFloat()
+      throws Exception {
+    testDataType(DataType.FLOAT);
+  }
 
-    //TEST USING THE READER
-    PinotDataBuffer pinotDataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(rangeIndexFile);
-    RangeIndexReader rangeIndexReader = new RangeIndexReader(pinotDataBuffer);
-    Number[] rangeStartArray = rangeIndexReader.getRangeStartArray();
-    for (int rangeId = 0; rangeId < rangeStartArray.length; rangeId++) {
-      ImmutableRoaringBitmap bitmap = rangeIndexReader.getDocIds(rangeId);
-      for (int docId : bitmap.toArray()) {
-        checkInt(rangeStartArray, rangeId, values, docId);
-      }
-    }
+  @Test
+  public void testDouble()
+      throws Exception {
+    testDataType(DataType.DOUBLE);
   }
 
-  private void checkInt(Number[] rangeStartArray, int rangeId, Number[] values, int docId) {
-    if (rangeId != rangeStartArray.length - 1) {
-      Assert.assertTrue(
-          rangeStartArray[rangeId].intValue() <= values[docId].intValue() && values[docId].intValue() < rangeStartArray[
-              rangeId + 1].intValue(), "rangestart:" + rangeStartArray[rangeId] + " value:" + values[docId]);
-    } else {
-      Assert.assertTrue(rangeStartArray[rangeId].intValue() <= values[docId].intValue(),
-          "rangestart:" + rangeStartArray[rangeId] + " value:" + values[docId]);
+  @Test
+  public void testIntMV()
+      throws Exception {
+    testDataTypeMV(DataType.INT);
+  }
+
+  @Test
+  public void testLongMV()
+      throws Exception {
+    testDataTypeMV(DataType.LONG);
+  }
+
+  @Test
+  public void testFloatMV()
+      throws Exception {
+    testDataTypeMV(DataType.FLOAT);
+  }
+
+  @Test
+  public void testDoubleMV()
+      throws Exception {
+    testDataTypeMV(DataType.DOUBLE);
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    FileUtils.deleteDirectory(INDEX_DIR);
+  }
+
+  private void testDataType(DataType dataType)
+      throws IOException {
+    FieldSpec fieldSpec = new DimensionFieldSpec(COLUMN_NAME, dataType, true);
+    int numDocs = 1000;
+    Number[] values = new Number[numDocs];
+
+    try (RangeIndexCreator creator = new RangeIndexCreator(INDEX_DIR, fieldSpec, dataType, -1, -1, numDocs, numDocs)) {
+      addDataToIndexer(dataType, numDocs, 1, creator, values);
+      creator.seal();
+    }
+
+    File rangeIndexFile = new File(INDEX_DIR, COLUMN_NAME + BITMAP_RANGE_INDEX_FILE_EXTENSION);
+    try (PinotDataBuffer dataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(rangeIndexFile)) {
+      RangeIndexReader rangeIndexReader = new RangeIndexReader(dataBuffer);
+      Number[] rangeStartArray = rangeIndexReader.getRangeStartArray();
+      for (int rangeId = 0; rangeId < rangeStartArray.length; rangeId++) {
+        ImmutableRoaringBitmap bitmap = rangeIndexReader.getDocIds(rangeId);
+        for (int docId : bitmap.toArray()) {
+          checkValueForDocId(dataType, values, rangeStartArray, rangeId, docId, 1);
+        }
+      }
     }
+
+    FileUtils.forceDelete(rangeIndexFile);
   }
 
-  private void testRangeIndexBufferFormat(Number[] values, File rangeIndexFile)
+  private void testDataTypeMV(DataType dataType)
       throws IOException {
-    DataInputStream dis = new DataInputStream(new FileInputStream(rangeIndexFile));
-    int version = dis.readInt();
-    int valueTypeBytesLength = dis.readInt();
+    FieldSpec fieldSpec = new DimensionFieldSpec(COLUMN_NAME, dataType, false);
+    int numDocs = 1000;
+    int numValuesPerMVEntry = 10;
+    int numValues = numDocs * numValuesPerMVEntry;
+    Number[] values = new Number[numValues];
 
-    byte[] valueTypeBytes = new byte[valueTypeBytesLength];
-    dis.read(valueTypeBytes);
-    String name = new String(valueTypeBytes);
-    FieldSpec.DataType dataType = FieldSpec.DataType.valueOf(name);
+    try (
+        RangeIndexCreator creator = new RangeIndexCreator(INDEX_DIR, fieldSpec, dataType, -1, -1, numDocs, numValues)) {
+      addDataToIndexer(dataType, numDocs, numValuesPerMVEntry, creator, values);
+      creator.seal();
+    }
 
-    int numRanges = dis.readInt();
+    File rangeIndexFile = new File(INDEX_DIR, COLUMN_NAME + BITMAP_RANGE_INDEX_FILE_EXTENSION);
+    try (PinotDataBuffer dataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(rangeIndexFile)) {
+      RangeIndexReader rangeIndexReader = new RangeIndexReader(dataBuffer);
+      Number[] rangeStartArray = rangeIndexReader.getRangeStartArray();
+      int numRanges = rangeStartArray.length;
+      for (int rangeId = 0; rangeId < numRanges; rangeId++) {
+        ImmutableRoaringBitmap bitmap = rangeIndexReader.getDocIds(rangeId);
+        for (int docId : bitmap.toArray()) {
+          checkValueForDocId(dataType, values, rangeStartArray, rangeId, docId, numValuesPerMVEntry);
+        }
+      }
+    }
 
-    Number[] rangeStart = new Number[numRanges];
-    Number rangeEnd;
+    FileUtils.forceDelete(rangeIndexFile);
+  }
 
+  private void addDataToIndexer(DataType dataType, int numDocs, int numValuesPerEntry, RangeIndexCreator creator,
+      Number[] values) {
     switch (dataType) {
       case INT:
-        for (int i = 0; i < numRanges; i++) {
-          rangeStart[i] = dis.readInt();
+        if (numValuesPerEntry == 1) {
+          for (int i = 0; i < numDocs; i++) {
+            int value = RANDOM.nextInt();
+            values[i] = value;
+            creator.add(value);
+          }
+        } else {
+          int[] intValues = new int[numValuesPerEntry];
+          for (int i = 0; i < numDocs; i++) {
+            for (int j = 0; j < numValuesPerEntry; j++) {
+              int value = RANDOM.nextInt();
+              intValues[j] = value;
+              values[i * numValuesPerEntry + j] = value;
+            }
+            creator.add(intValues, numValuesPerEntry);
+          }
         }
-        rangeEnd = dis.readInt();
         break;
       case LONG:
-        for (int i = 0; i < numRanges; i++) {
-          rangeStart[i] = dis.readLong();
+        if (numValuesPerEntry == 1) {
+          for (int i = 0; i < numDocs; i++) {
+            long value = RANDOM.nextLong();
+            values[i] = value;
+            creator.add(value);
+          }
+        } else {
+          long[] longValues = new long[numValuesPerEntry];
+          for (int i = 0; i < numDocs; i++) {
+            for (int j = 0; j < numValuesPerEntry; j++) {
+              long value = RANDOM.nextLong();
+              longValues[j] = value;
+              values[i * numValuesPerEntry + j] = value;
+            }
+            creator.add(longValues, numValuesPerEntry);
+          }
         }
-        rangeEnd = dis.readLong();
         break;
       case FLOAT:
-        for (int i = 0; i < numRanges; i++) {
-          rangeStart[i] = dis.readFloat();
+        if (numValuesPerEntry == 1) {
+          for (int i = 0; i < numDocs; i++) {
+            float value = RANDOM.nextFloat();
+            values[i] = value;
+            creator.add(value);
+          }
+        } else {
+          float[] floatValues = new float[numValuesPerEntry];
+          for (int i = 0; i < numDocs; i++) {
+            for (int j = 0; j < numValuesPerEntry; j++) {
+              float value = RANDOM.nextFloat();
+              floatValues[j] = value;
+              values[i * numValuesPerEntry + j] = value;
+            }
+            creator.add(floatValues, numValuesPerEntry);
+          }
         }
-        rangeEnd = dis.readFloat();
         break;
       case DOUBLE:
-        for (int i = 0; i < numRanges; i++) {
-          rangeStart[i] = dis.readDouble();
+        if (numValuesPerEntry == 1) {
+          for (int i = 0; i < numDocs; i++) {
+            double value = RANDOM.nextDouble();
+            values[i] = value;
+            creator.add(value);
+          }
+        } else {
+          double[] doubleValues = new double[numValuesPerEntry];
+          for (int i = 0; i < numDocs; i++) {
+            for (int j = 0; j < numValuesPerEntry; j++) {
+              double value = RANDOM.nextDouble();
+              doubleValues[j] = value;
+              values[i * numValuesPerEntry + j] = value;
+            }
+            creator.add(doubleValues, numValuesPerEntry);
+          }
         }
-        rangeEnd = dis.readDouble();
         break;
+      default:
+        throw new IllegalStateException();
     }
+  }
 
-    long[] rangeBitmapOffsets = new long[numRanges + 1];
-    for (int i = 0; i <= numRanges; i++) {
-      rangeBitmapOffsets[i] = dis.readLong();
-    }
-    ImmutableRoaringBitmap[] bitmaps = new ImmutableRoaringBitmap[numRanges];
-    for (int i = 0; i < numRanges; i++) {
-      long serializedBitmapLength;
-      serializedBitmapLength = rangeBitmapOffsets[i + 1] - rangeBitmapOffsets[i];
-      byte[] bytes = new byte[(int) serializedBitmapLength];
-      dis.read(bytes, 0, (int) serializedBitmapLength);
-      bitmaps[i] = new ImmutableRoaringBitmap(ByteBuffer.wrap(bytes));
-      for (int docId : bitmaps[i].toArray()) {
-        if (i != numRanges - 1) {
-          Assert.assertTrue(
-              rangeStart[i].intValue() <= values[docId].intValue() && values[docId].intValue() < rangeStart[i + 1]
-                  .intValue(), "rangestart:" + rangeStart[i] + " value:" + values[docId]);
+  private void checkValueForDocId(DataType dataType, Number[] values, Number[] rangeStartArray, int rangeId, int docId,
+      int numValuesPerEntry) {
+    switch (dataType) {
+      case INT:
+        if (numValuesPerEntry == 1) {
+          checkInt(rangeStartArray, rangeId, values[docId].intValue());
         } else {
-          Assert.assertTrue(rangeStart[i].intValue() <= values[docId].intValue());
+          checkIntMV(rangeStartArray, rangeId, values, docId, numValuesPerEntry);
+        }
+        break;
+      case LONG:
+        if (numValuesPerEntry == 1) {
+          checkLong(rangeStartArray, rangeId, values[docId].longValue());
+        } else {
+          checkLongMV(rangeStartArray, rangeId, values, docId, numValuesPerEntry);
+        }
+        break;
+      case FLOAT:
+        if (numValuesPerEntry == 1) {
+          checkFloat(rangeStartArray, rangeId, values[docId].floatValue());
+        } else {
+          checkFloatMV(rangeStartArray, rangeId, values, docId, numValuesPerEntry);
+        }
+        break;
+      case DOUBLE:
+        if (numValuesPerEntry == 1) {
+          checkDouble(rangeStartArray, rangeId, values[docId].doubleValue());
+        } else {
+          checkDoubleMV(rangeStartArray, rangeId, values, docId, numValuesPerEntry);
+        }
+        break;
+      default:
+        throw new IllegalStateException();
+    }
+  }
+
+  private void checkInt(Number[] rangeStartArray, int rangeId, int value) {
+    assertTrue(rangeStartArray[rangeId].intValue() <= value);
+    if (rangeId != rangeStartArray.length - 1) {
+      assertTrue(value < rangeStartArray[rangeId + 1].intValue());
+    }
+  }
+
+  private void checkIntMV(Number[] rangeStartArray, int rangeId, Number[] values, int docId, int numValuesPerMVEntry) {
+    if (rangeId != rangeStartArray.length - 1) {
+      for (int i = 0; i < numValuesPerMVEntry; i++) {
+        if (rangeStartArray[rangeId].intValue() <= values[docId * numValuesPerMVEntry + i].intValue()
+            && values[docId * numValuesPerMVEntry + i].intValue() < rangeStartArray[rangeId + 1].intValue()) {
+          return;
+        }
+      }
+    } else {
+      for (int i = 0; i < numValuesPerMVEntry; i++) {
+        if (rangeStartArray[rangeId].intValue() <= values[docId * numValuesPerMVEntry + i].intValue()) {
+          return;
+        }
+      }
+    }
+    fail();
+  }
+
+  private void checkLong(Number[] rangeStartArray, int rangeId, long value) {
+    assertTrue(rangeStartArray[rangeId].longValue() <= value);
+    if (rangeId != rangeStartArray.length - 1) {
+      assertTrue(value < rangeStartArray[rangeId + 1].longValue());
+    }
+  }
+
+  private void checkLongMV(Number[] rangeStartArray, int rangeId, Number[] values, int docId, int numValuesPerMVEntry) {
+    if (rangeId != rangeStartArray.length - 1) {
+      for (int i = 0; i < numValuesPerMVEntry; i++) {
+        if (rangeStartArray[rangeId].longValue() <= values[docId * numValuesPerMVEntry + i].longValue()
+            && values[docId * numValuesPerMVEntry + i].longValue() < rangeStartArray[rangeId + 1].longValue()) {
+          return;
+        }
+      }
+    } else {
+      for (int i = 0; i < numValuesPerMVEntry; i++) {
+        if (rangeStartArray[rangeId].longValue() <= values[docId * numValuesPerMVEntry + i].longValue()) {
+          return;
+        }
+      }
+    }
+    fail();
+  }
+
+  private void checkFloat(Number[] rangeStartArray, int rangeId, float value) {
+    assertTrue(rangeStartArray[rangeId].floatValue() <= value);
+    if (rangeId != rangeStartArray.length - 1) {
+      assertTrue(value < rangeStartArray[rangeId + 1].floatValue());
+    }
+  }
+
+  private void checkFloatMV(Number[] rangeStartArray, int rangeId, Number[] values, int docId,
+      int numValuesPerMVEntry) {
+    if (rangeId != rangeStartArray.length - 1) {
+      for (int i = 0; i < numValuesPerMVEntry; i++) {
+        if (rangeStartArray[rangeId].floatValue() <= values[docId * numValuesPerMVEntry + i].floatValue()
+            && values[docId * numValuesPerMVEntry + i].floatValue() < rangeStartArray[rangeId + 1].floatValue()) {
+          return;
+        }
+      }
+    } else {
+      for (int i = 0; i < numValuesPerMVEntry; i++) {
+        if (rangeStartArray[rangeId].floatValue() <= values[docId * numValuesPerMVEntry + i].floatValue()) {
+          return;
+        }
+      }
+    }
+    fail();
+  }
+
+  private void checkDouble(Number[] rangeStartArray, int rangeId, double value) {
+    assertTrue(rangeStartArray[rangeId].doubleValue() <= value);
+    if (rangeId != rangeStartArray.length - 1) {
+      assertTrue(value < rangeStartArray[rangeId + 1].doubleValue());
+    }
+  }
+
+  private void checkDoubleMV(Number[] rangeStartArray, int rangeId, Number[] values, int docId,
+      int numValuesPerMVEntry) {
+    if (rangeId != rangeStartArray.length - 1) {
+      for (int i = 0; i < numValuesPerMVEntry; i++) {
+        if (rangeStartArray[rangeId].doubleValue() <= values[docId * numValuesPerMVEntry + i].doubleValue()
+            && values[docId * numValuesPerMVEntry + i].doubleValue() < rangeStartArray[rangeId + 1].doubleValue()) {
+          return;
+        }
+      }
+    } else {
+      for (int i = 0; i < numValuesPerMVEntry; i++) {
+        if (rangeStartArray[rangeId].doubleValue() <= values[docId * numValuesPerMVEntry + i].doubleValue()) {
+          return;
         }
       }
     }
+    fail();
   }
-}
\ No newline at end of file
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org