You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/05/10 03:47:25 UTC

[incubator-pinot] 01/01: Enhance the support for BYTES data type

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch bytes-enhancement
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 1c01c33de775312158fb24aa9f9a3e35231e4ff1
Author: Jackie (Xiaotian) Jiang <xa...@linkedin.com>
AuthorDate: Thu May 9 20:38:52 2019 -0700

    Enhance the support for BYTES data type
    
    1. Add BytesUtils to handle encode/decode for Hex string
    2. Add FieldSpec.DataType.convert() to convert string value to data type
    3. Support Hex string for index(), indexOf(), inRange() in all BYTES dictionaries
    4. Add MutableDictionary.compare(dictId1, dictId2) to help the sort process and support sorting byte[]
    5. Support returning Hex string in group-by for byte[] column
    6. In MutableSegmentImpl, uniform the sort for all types, reduce the memory usage especially for STRING and BYTES columns
---
 .../org/apache/pinot/common/data/FieldSpec.java    |  54 +++----
 .../org/apache/pinot/common/utils/BytesUtils.java  |  56 +++++++
 .../pinot/common/utils/primitive/ByteArray.java    |  14 +-
 .../org/apache/pinot/common/data/SchemaTest.java   |   8 +-
 .../indexsegment/mutable/MutableSegmentImpl.java   | 174 ++++-----------------
 .../groupby/DictionaryBasedGroupKeyGenerator.java  |   6 +-
 .../core/query/pruner/AbstractSegmentPruner.java   |  25 ++-
 ...SelectionSingleValueColumnWithDictIterator.java |   4 +-
 .../iterator/StringSelectionColumnIterator.java    |   5 +-
 .../converter/stats/RealtimeColumnStatistics.java  |  44 +++---
 .../dictionary/BaseOffHeapMutableDictionary.java   |  30 ----
 .../dictionary/BaseOnHeapMutableDictionary.java    |   1 -
 .../dictionary/BytesOffHeapMutableDictionary.java  |  48 ++----
 .../dictionary/BytesOnHeapMutableDictionary.java   |  48 ++----
 .../dictionary/DoubleOffHeapMutableDictionary.java |  47 +++---
 .../dictionary/DoubleOnHeapMutableDictionary.java  |   5 +
 .../dictionary/FloatOffHeapMutableDictionary.java  |  48 +++---
 .../dictionary/FloatOnHeapMutableDictionary.java   |   5 +
 .../dictionary/IntOffHeapMutableDictionary.java    |  48 +++---
 .../dictionary/IntOnHeapMutableDictionary.java     |   5 +
 .../dictionary/LongOffHeapMutableDictionary.java   |  48 +++---
 .../dictionary/LongOnHeapMutableDictionary.java    |   5 +
 .../impl/dictionary/MutableDictionary.java         |   5 +
 .../dictionary/StringOffHeapMutableDictionary.java |  15 +-
 .../dictionary/StringOnHeapMutableDictionary.java  |  19 +--
 .../core/segment/index/readers/BaseDictionary.java |   1 -
 .../segment/index/readers/BytesDictionary.java     |  10 +-
 .../pinot/core/util/FixedIntArrayOffHeapIdMap.java |   8 +-
 .../impl/dictionary/MutableDictionaryTest.java     |   3 +-
 .../readers/ImmutableDictionaryReaderTest.java     |   1 +
 .../segments/v1/creator/DictionariesTest.java      |  39 ++++-
 31 files changed, 380 insertions(+), 449 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/data/FieldSpec.java b/pinot-common/src/main/java/org/apache/pinot/common/data/FieldSpec.java
index 00192a6..4046c70 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/data/FieldSpec.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/data/FieldSpec.java
@@ -22,11 +22,9 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import javax.annotation.Nullable;
 import org.apache.avro.Schema.Type;
-import org.apache.commons.codec.DecoderException;
-import org.apache.commons.codec.binary.Hex;
-import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.config.ConfigKey;
 import org.apache.pinot.common.config.ConfigNodeLifecycleAware;
+import org.apache.pinot.common.utils.BytesUtils;
 import org.apache.pinot.common.utils.EqualityUtils;
 import org.apache.pinot.common.utils.JsonUtils;
 
@@ -168,7 +166,7 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, ConfigNodeLife
    */
   protected static String getStringValue(Object value) {
     if (value instanceof byte[]) {
-      return Hex.encodeHexString((byte[]) value);
+      return BytesUtils.toHexString((byte[]) value);
     } else {
       return value.toString();
     }
@@ -187,26 +185,7 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, ConfigNodeLife
   private static Object getDefaultNullValue(FieldType fieldType, DataType dataType,
       @Nullable String stringDefaultNullValue) {
     if (stringDefaultNullValue != null) {
-      switch (dataType) {
-        case INT:
-          return Integer.valueOf(stringDefaultNullValue);
-        case LONG:
-          return Long.valueOf(stringDefaultNullValue);
-        case FLOAT:
-          return Float.valueOf(stringDefaultNullValue);
-        case DOUBLE:
-          return Double.valueOf(stringDefaultNullValue);
-        case STRING:
-          return stringDefaultNullValue;
-        case BYTES:
-          try {
-            return Hex.decodeHex(stringDefaultNullValue.toCharArray());
-          } catch (DecoderException e) {
-            Utils.rethrowException(e); // Re-throw to avoid handling exceptions in all callers.
-          }
-        default:
-          throw new UnsupportedOperationException("Unsupported data type: " + dataType);
-      }
+      return dataType.convert(stringDefaultNullValue);
     } else {
       switch (fieldType) {
         case METRIC:
@@ -383,10 +362,7 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, ConfigNodeLife
    * The <code>DataType</code> enum is used to demonstrate the data type of a field.
    */
   public enum DataType {
-    INT, LONG, FLOAT, DOUBLE, BOOLEAN,
-
-    // Stored as STRING
-    STRING, BYTES;
+    INT, LONG, FLOAT, DOUBLE, BOOLEAN/* Stored as STRING */, STRING, BYTES;
 
     /**
      * Returns the data type stored in Pinot.
@@ -439,6 +415,28 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, ConfigNodeLife
           throw new IllegalStateException("Cannot get number of bytes for: " + this);
       }
     }
+
+    /**
+     * Converts the given string value to the data type.
+     */
+    public Object convert(String value) {
+      switch (this) {
+        case INT:
+          return Integer.valueOf(value);
+        case LONG:
+          return Long.valueOf(value);
+        case FLOAT:
+          return Float.valueOf(value);
+        case DOUBLE:
+          return Double.valueOf(value);
+        case STRING:
+          return value;
+        case BYTES:
+          BytesUtils.toBytes(value);
+        default:
+          throw new UnsupportedOperationException("Unsupported data type: " + this);
+      }
+    }
   }
 
   @Override
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/BytesUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/BytesUtils.java
new file mode 100644
index 0000000..63c6396
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/BytesUtils.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.binary.Hex;
+
+
+public class BytesUtils {
+  private BytesUtils() {
+  }
+
+  /**
+   * Converts Hex encoded string to byte[] if necessary.
+   *
+   * @param rawValue byte array or Hex encoded string
+   * @return value itself if byte array is provided, or decoded byte array if Hex encoded string is provided
+   */
+  public static byte[] toBytes(Object rawValue) {
+    if (rawValue instanceof String) {
+      try {
+        return Hex.decodeHex(((String) rawValue).toCharArray());
+      } catch (DecoderException e) {
+        throw new IllegalArgumentException("Value: " + rawValue + " is not Hex encoded", e);
+      }
+    } else {
+      return (byte[]) rawValue;
+    }
+  }
+
+  /**
+   * Converts the byte array to a Hex encoded string.
+   *
+   * @param bytes byte array
+   * @return Hex encoded string
+   */
+  public static String toHexString(byte[] bytes) {
+    return Hex.encodeHexString(bytes);
+  }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/primitive/ByteArray.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/primitive/ByteArray.java
index 6ce798e..ed105c2 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/primitive/ByteArray.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/primitive/ByteArray.java
@@ -20,7 +20,7 @@ package org.apache.pinot.common.utils.primitive;
 
 import java.util.Arrays;
 import javax.annotation.Nonnull;
-import org.apache.commons.codec.binary.Hex;
+import org.apache.pinot.common.utils.BytesUtils;
 
 
 /**
@@ -45,19 +45,13 @@ public class ByteArray implements Comparable<ByteArray> {
     return _bytes.length;
   }
 
-  /**
-   * Static utility function to convert a byte[] to Hex string.
-   *
-   * @param bytes byte[] to convert
-   * @return Equivalent Hex String.
-   */
-  public static String toHexString(byte[] bytes) {
-    return Hex.encodeHexString(bytes);
+  public String toHexString() {
+    return BytesUtils.toHexString(_bytes);
   }
 
   @Override
   public String toString() {
-    return toHexString(_bytes);
+    return toHexString();
   }
 
   @Override
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java b/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java
index cd9756a..573453c 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/data/SchemaTest.java
@@ -19,12 +19,10 @@
 package org.apache.pinot.common.data;
 
 import java.io.File;
-import java.io.IOException;
 import java.net.URL;
 import java.util.concurrent.TimeUnit;
-import org.apache.commons.codec.DecoderException;
-import org.apache.commons.codec.binary.Hex;
 import org.apache.pinot.common.data.TimeGranularitySpec.TimeFormat;
+import org.apache.pinot.common.utils.BytesUtils;
 import org.apache.pinot.common.utils.SchemaUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -304,10 +302,10 @@ public class SchemaTest {
 
   @Test
   public void testByteType()
-      throws DecoderException, IOException {
+      throws Exception {
     Schema expectedSchema = new Schema();
     byte[] expectedEmptyDefault = new byte[0];
-    byte[] expectedNonEmptyDefault = Hex.decodeHex("abcd1234".toCharArray());
+    byte[] expectedNonEmptyDefault = BytesUtils.toBytes("abcd1234");
 
     expectedSchema.setSchemaName("test");
     expectedSchema.addField(new MetricFieldSpec("noDefault", FieldSpec.DataType.BYTES));
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
index a92ecfc..0c1112c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
@@ -19,8 +19,9 @@
 package org.apache.pinot.core.indexsegment.mutable;
 
 import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.ints.IntArrays;
+import it.unimi.dsi.fastutil.ints.IntComparator;
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -518,118 +519,6 @@ public class MutableSegmentImpl implements MutableSegment {
     }
   }
 
-  private IntIterator[] getSortedBitmapIntIteratorsForIntColumn(String column) {
-    MutableDictionary dictionary = _dictionaryMap.get(column);
-    int numValues = dictionary.length();
-    IntIterator[] intIterators = new IntIterator[numValues];
-    RealtimeInvertedIndexReader invertedIndex = _invertedIndexMap.get(column);
-
-    int[] values = new int[numValues];
-    for (int i = 0; i < numValues; i++) {
-      values[i] = (Integer) dictionary.get(i);
-    }
-
-    long start = System.currentTimeMillis();
-    Arrays.sort(values);
-    _logger.info("Spent {}ms sorting int column: {} with cardinality: {}", System.currentTimeMillis() - start, column,
-        numValues);
-
-    for (int i = 0; i < numValues; i++) {
-      intIterators[i] = invertedIndex.getDocIds(dictionary.indexOf(values[i])).getIntIterator();
-    }
-    return intIterators;
-  }
-
-  private IntIterator[] getSortedBitmapIntIteratorsForLongColumn(String column) {
-    MutableDictionary dictionary = _dictionaryMap.get(column);
-    int numValues = dictionary.length();
-    IntIterator[] intIterators = new IntIterator[numValues];
-    RealtimeInvertedIndexReader invertedIndex = _invertedIndexMap.get(column);
-
-    long[] values = new long[numValues];
-    for (int i = 0; i < numValues; i++) {
-      values[i] = (Long) dictionary.get(i);
-    }
-
-    long start = System.currentTimeMillis();
-    Arrays.sort(values);
-    _logger.info("Spent {}ms sorting long column: {} with cardinality: {}", System.currentTimeMillis() - start, column,
-        numValues);
-
-    for (int i = 0; i < numValues; i++) {
-      intIterators[i] = invertedIndex.getDocIds(dictionary.indexOf(values[i])).getIntIterator();
-    }
-    return intIterators;
-  }
-
-  private IntIterator[] getSortedBitmapIntIteratorsForFloatColumn(String column) {
-    MutableDictionary dictionary = _dictionaryMap.get(column);
-    int numValues = dictionary.length();
-    IntIterator[] intIterators = new IntIterator[numValues];
-    RealtimeInvertedIndexReader invertedIndex = _invertedIndexMap.get(column);
-
-    float[] values = new float[numValues];
-    for (int i = 0; i < numValues; i++) {
-      values[i] = (Float) dictionary.get(i);
-    }
-
-    long start = System.currentTimeMillis();
-    Arrays.sort(values);
-    _logger.info("Spent {}ms sorting float column: {} with cardinality: {}", System.currentTimeMillis() - start, column,
-        numValues);
-
-    for (int i = 0; i < numValues; i++) {
-      intIterators[i] = invertedIndex.getDocIds(dictionary.indexOf(values[i])).getIntIterator();
-    }
-    return intIterators;
-  }
-
-  private IntIterator[] getSortedBitmapIntIteratorsForDoubleColumn(String column) {
-    MutableDictionary dictionary = _dictionaryMap.get(column);
-    int numValues = dictionary.length();
-    IntIterator[] intIterators = new IntIterator[numValues];
-    RealtimeInvertedIndexReader invertedIndex = _invertedIndexMap.get(column);
-
-    double[] values = new double[numValues];
-    for (int i = 0; i < numValues; i++) {
-      values[i] = (Double) dictionary.get(i);
-    }
-
-    long start = System.currentTimeMillis();
-    Arrays.sort(values);
-    _logger
-        .info("Spent {}ms sorting double column: {} with cardinality: {}", System.currentTimeMillis() - start, column,
-            numValues);
-
-    for (int i = 0; i < numValues; i++) {
-      intIterators[i] = invertedIndex.getDocIds(dictionary.indexOf(values[i])).getIntIterator();
-    }
-    return intIterators;
-  }
-
-  private IntIterator[] getSortedBitmapIntIteratorsForStringColumn(String column) {
-    MutableDictionary dictionary = _dictionaryMap.get(column);
-    int numValues = dictionary.length();
-    IntIterator[] intIterators = new IntIterator[numValues];
-    RealtimeInvertedIndexReader invertedIndex = _invertedIndexMap.get(column);
-
-    String[] values = new String[numValues];
-    for (int i = 0; i < numValues; i++) {
-      values[i] = (String) dictionary.get(i);
-    }
-
-    long start = System.currentTimeMillis();
-    Arrays.sort(values);
-    _logger
-        .info("Spent {}ms sorting string column: {} with cardinality: {}", System.currentTimeMillis() - start, column,
-            numValues);
-
-    for (int i = 0; i < numValues; i++) {
-      intIterators[i] = invertedIndex.getDocIds(dictionary.indexOf(values[i])).getIntIterator();
-    }
-    return intIterators;
-  }
-
   /**
    * Returns the docIds to use for iteration when the data is sorted by the given column.
    * <p>Called only by realtime record reader.
@@ -638,42 +527,39 @@ public class MutableSegmentImpl implements MutableSegment {
    * @return The docIds to use for iteration
    */
   public int[] getSortedDocIdIterationOrderWithSortedColumn(String column) {
-    int[] docIds = new int[_numDocsIndexed];
+    MutableDictionary dictionary = _dictionaryMap.get(column);
+    int numValues = dictionary.length();
 
-    // Get docId iterators that iterate in order on the data
-    IntIterator[] iterators;
-    FieldSpec.DataType dataType = _schema.getFieldSpecFor(column).getDataType();
-    switch (dataType) {
-      case INT:
-        iterators = getSortedBitmapIntIteratorsForIntColumn(column);
-        break;
-      case LONG:
-        iterators = getSortedBitmapIntIteratorsForLongColumn(column);
-        break;
-      case FLOAT:
-        iterators = getSortedBitmapIntIteratorsForFloatColumn(column);
-        break;
-      case DOUBLE:
-        iterators = getSortedBitmapIntIteratorsForDoubleColumn(column);
-        break;
-      case STRING:
-        iterators = getSortedBitmapIntIteratorsForStringColumn(column);
-        break;
-      default:
-        throw new UnsupportedOperationException("Unsupported data type: " + dataType + " for sorted column: " + column);
+    int[] dictIds = new int[numValues];
+    for (int i = 0; i < numValues; i++) {
+      dictIds[i] = i;
     }
+    IntArrays.quickSort(dictIds, new IntComparator() {
+      @Override
+      public int compare(int dictId1, int dictId2) {
+        return dictionary.compare(dictId1, dictId2);
+      }
 
-    // Drain the iterators into the docIds array
-    int i = 0;
-    for (IntIterator iterator : iterators) {
-      while (iterator.hasNext()) {
-        docIds[i++] = iterator.next();
+      @Override
+      public int compare(Integer o1, Integer o2) {
+        return compare((int) o1, (int) o2);
+      }
+    });
+
+    RealtimeInvertedIndexReader invertedIndex = _invertedIndexMap.get(column);
+    int[] docIds = new int[_numDocsIndexed];
+    int docIdIndex = 0;
+    for (int dictId : dictIds) {
+      IntIterator intIterator = invertedIndex.getDocIds(dictId).getIntIterator();
+      while (intIterator.hasNext()) {
+        docIds[docIdIndex++] = intIterator.next();
       }
     }
 
     // Sanity check
-    Preconditions.checkState(_numDocsIndexed == i,
-        "The number of docs indexed: %s is not equal to the number of sorted documents: %s", _numDocsIndexed, i);
+    Preconditions.checkState(_numDocsIndexed == docIdIndex,
+        "The number of documents indexed: %s is not equal to the number of sorted documents: %s", _numDocsIndexed,
+        docIdIndex);
 
     return docIds;
   }
@@ -764,8 +650,8 @@ public class MutableSegmentImpl implements MutableSegment {
       }
       // https://github.com/apache/incubator-pinot/issues/3867
       if (!schema.getDimensionSpec(dimension).isSingleValueField()) {
-        _logger
-            .warn("Metrics aggregation cannot be turned ON in presence of multi-value dimension columns, eg: {}", dimension);
+        _logger.warn("Metrics aggregation cannot be turned ON in presence of multi-value dimension columns, eg: {}",
+            dimension);
         _aggregateMetrics = false;
         break;
       }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
index 543971a..78bcf08 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
@@ -421,15 +421,15 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
   private String getGroupKey(int rawKey) {
     // Specialize single group-by column case
     if (_numGroupByExpressions == 1) {
-      return _dictionaries[0].get(rawKey).toString();
+      return _dictionaries[0].getStringValue(rawKey);
     } else {
       int cardinality = _cardinalities[0];
-      StringBuilder groupKeyBuilder = new StringBuilder(_dictionaries[0].get(rawKey % cardinality).toString());
+      StringBuilder groupKeyBuilder = new StringBuilder(_dictionaries[0].getStringValue(rawKey % cardinality));
       rawKey /= cardinality;
       for (int i = 1; i < _numGroupByExpressions; i++) {
         groupKeyBuilder.append(AggregationGroupByTrimmingService.GROUP_KEY_DELIMITER);
         cardinality = _cardinalities[i];
-        groupKeyBuilder.append(_dictionaries[i].get(rawKey % cardinality));
+        groupKeyBuilder.append(_dictionaries[i].getStringValue(rawKey % cardinality));
         rawKey /= cardinality;
       }
       return groupKeyBuilder.toString();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/AbstractSegmentPruner.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/AbstractSegmentPruner.java
index a525fb1..9c22eac 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/AbstractSegmentPruner.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/AbstractSegmentPruner.java
@@ -23,6 +23,8 @@ import java.util.Map;
 import javax.annotation.Nonnull;
 import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.common.request.FilterOperator;
+import org.apache.pinot.common.utils.BytesUtils;
+import org.apache.pinot.common.utils.primitive.ByteArray;
 import org.apache.pinot.common.utils.request.FilterQueryTree;
 import org.apache.pinot.core.query.exception.BadQueryRequestException;
 import org.apache.pinot.core.segment.index.ColumnMetadata;
@@ -87,26 +89,17 @@ public abstract class AbstractSegmentPruner implements SegmentPruner {
    * @param input Input String for which to get the value
    * @param dataType Data type to construct from the String.
    * @return Comparable value of specified data type built from the input String.
-   * @note It is assumed that the 'input' here is a value taken from the query, so this method
-   * should not be used to for other internal purposes.
+   * @apiNote It is assumed that the 'input' here is a value taken from the query, so this method should not be used for
+   * other internal purposes.
    */
   protected static Comparable getValue(@Nonnull String input, @Nonnull FieldSpec.DataType dataType) {
     try {
-      switch (dataType) {
-        case INT:
-          return Integer.valueOf(input);
-        case LONG:
-          return Long.valueOf(input);
-        case FLOAT:
-          return Float.valueOf(input);
-        case DOUBLE:
-          return Double.valueOf(input);
-        case STRING:
-          return input;
-        default:
-          throw new IllegalStateException("Unsupported data type: " + dataType);
+      if (dataType != FieldSpec.DataType.BYTES) {
+        return (Comparable) dataType.convert(input);
+      } else {
+        return new ByteArray(BytesUtils.toBytes(input));
       }
-    } catch (NumberFormatException e) {
+    } catch (Exception e) {
       throw new BadQueryRequestException(e);
     }
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/iterator/SelectionSingleValueColumnWithDictIterator.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/iterator/SelectionSingleValueColumnWithDictIterator.java
index 622bfb6..33d3e2a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/iterator/SelectionSingleValueColumnWithDictIterator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/iterator/SelectionSingleValueColumnWithDictIterator.java
@@ -20,10 +20,10 @@ package org.apache.pinot.core.query.selection.iterator;
 
 import java.io.Serializable;
 import org.apache.pinot.common.data.FieldSpec;
-import org.apache.pinot.common.utils.primitive.ByteArray;
 import org.apache.pinot.core.common.Block;
 import org.apache.pinot.core.common.BlockSingleValIterator;
 import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.apache.pinot.common.utils.BytesUtils;
 
 
 /**
@@ -47,7 +47,7 @@ public class SelectionSingleValueColumnWithDictIterator implements SelectionColu
 
     // For selection, we convert BYTES data type to equivalent HEX string.
     if (_dataType.equals(FieldSpec.DataType.BYTES)) {
-      return ByteArray.toHexString(_dictionary.getBytesValue(_blockSingleValIterator.nextIntVal()));
+      return BytesUtils.toHexString(_dictionary.getBytesValue(_blockSingleValIterator.nextIntVal()));
     }
     return (Serializable) _dictionary.get(_blockSingleValIterator.nextIntVal());
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/iterator/StringSelectionColumnIterator.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/iterator/StringSelectionColumnIterator.java
index 6ae2d35..318f17a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/iterator/StringSelectionColumnIterator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/iterator/StringSelectionColumnIterator.java
@@ -21,9 +21,9 @@ package org.apache.pinot.core.query.selection.iterator;
 import com.clearspring.analytics.util.Preconditions;
 import java.io.Serializable;
 import org.apache.pinot.common.data.FieldSpec;
-import org.apache.pinot.common.utils.primitive.ByteArray;
 import org.apache.pinot.core.common.Block;
 import org.apache.pinot.core.common.BlockSingleValIterator;
+import org.apache.pinot.common.utils.BytesUtils;
 
 
 /**
@@ -48,8 +48,7 @@ public class StringSelectionColumnIterator implements SelectionColumnIterator {
 
     if (_dataType.equals(FieldSpec.DataType.BYTES)) {
       // byte[] is converted to equivalent Hex String for selection queries.
-      byte[] bytes = bvIter.nextBytesVal();
-      return ByteArray.toHexString(bytes);
+      return BytesUtils.toHexString(bvIter.nextBytesVal());
     } else {
       return bvIter.nextStringVal();
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeColumnStatistics.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeColumnStatistics.java
index 860d569..f3b3dcc 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeColumnStatistics.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/stats/RealtimeColumnStatistics.java
@@ -22,8 +22,8 @@ import java.util.HashSet;
 import java.util.Set;
 import org.apache.pinot.common.config.ColumnPartitionConfig;
 import org.apache.pinot.common.data.FieldSpec;
-import org.apache.pinot.common.utils.primitive.ByteArray;
 import org.apache.pinot.core.common.Block;
+import org.apache.pinot.core.common.BlockMetadata;
 import org.apache.pinot.core.common.BlockMultiValIterator;
 import org.apache.pinot.core.data.partition.PartitionFunction;
 import org.apache.pinot.core.data.partition.PartitionFunctionFactory;
@@ -135,40 +135,34 @@ public class RealtimeColumnStatistics implements ColumnStatistics {
 
   @Override
   public boolean isSorted() {
-    // Multivalue columns can't be in sorted order
-    if (!_block.getMetadata().isSingleValue()) {
+    BlockMetadata blockMetadata = _block.getMetadata();
+
+    // Multi-valued column cannot be sorted
+    if (!blockMetadata.isSingleValue()) {
       return false;
     }
 
-    // If this is a single value, then by definition the data is sorted
-    final int blockLength = _block.getMetadata().getLength();
-    if (blockLength <= 1 || getCardinality() <= 1) {
+    // If there is only one distinct value, then it is sorted
+    if (getCardinality() == 1) {
+      return true;
+    }
+
+    // If sorted doc Ids are provided, then it is sorted
+    if (_sortedDocIdIterationOrder != null) {
       return true;
     }
 
     // Iterate over all data to figure out whether or not it's in sorted order
     SingleColumnSingleValueReader singleValueReader = ((SingleValueBlock) _block).getReader();
 
-    int docIdIndex = _sortedDocIdIterationOrder != null ? _sortedDocIdIterationOrder[0] : 0;
-    int dictionaryId = singleValueReader.getInt(docIdIndex);
-    Object previousValue = _dictionaryReader.get(dictionaryId);
-    for (int i = 1; i < blockLength; i++) {
-      docIdIndex = _sortedDocIdIterationOrder != null ? _sortedDocIdIterationOrder[i] : i;
-      dictionaryId = singleValueReader.getInt(docIdIndex);
-      Object currentValue = _dictionaryReader.get(dictionaryId);
-      // If previousValue is greater than currentValue
-      switch (_block.getMetadata().getDataType().getStoredType()) {
-        case BYTES:
-          if (0 < ByteArray.compare((byte[]) previousValue, (byte[]) currentValue)) {
-            return false;
-          }
-          break;
-        default:
-          if (0 < ((Comparable) previousValue).compareTo(currentValue)) {
-            return false;
-          }
+    int numDocs = blockMetadata.getLength();
+    int previousDictId = singleValueReader.getInt(0);
+    for (int docId = 1; docId < numDocs; docId++) {
+      int currentDictId = singleValueReader.getInt(docId);
+      if (_dictionaryReader.compare(previousDictId, currentDictId) > 0) {
+        return false;
       }
-      previousValue = currentValue;
+      previousDictId = currentDictId;
     }
 
     return true;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/BaseOffHeapMutableDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/BaseOffHeapMutableDictionary.java
index a8d9638..6841112 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/BaseOffHeapMutableDictionary.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/BaseOffHeapMutableDictionary.java
@@ -209,36 +209,6 @@ public abstract class BaseOffHeapMutableDictionary extends MutableDictionary {
   }
 
   @Override
-  public int indexOf(Object rawValue) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Object get(int dictionaryId) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public long getLongValue(int dictionaryId) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public double getDoubleValue(int dictionaryId) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public float getFloatValue(int dictionaryId) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public int getIntValue(int dictionaryId) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
   public int length() {
     return _numEntries;
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/BaseOnHeapMutableDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/BaseOnHeapMutableDictionary.java
index e19f4a1..d2da10c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/BaseOnHeapMutableDictionary.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/BaseOnHeapMutableDictionary.java
@@ -44,7 +44,6 @@ public abstract class BaseOnHeapMutableDictionary extends MutableDictionary {
   /**
    * For performance, we don't validate the dictId passed in. It should be returned by index() or indexOf().
    */
-  @Nonnull
   @Override
   public Object get(int dictId) {
     return _dictIdToValue[dictId >>> SHIFT_OFFSET][dictId & MASK];
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/BytesOffHeapMutableDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/BytesOffHeapMutableDictionary.java
index 5220749..1088590 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/BytesOffHeapMutableDictionary.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/BytesOffHeapMutableDictionary.java
@@ -21,9 +21,7 @@ package org.apache.pinot.core.realtime.impl.dictionary;
 import java.io.IOException;
 import java.util.Arrays;
 import javax.annotation.Nonnull;
-import org.apache.commons.codec.DecoderException;
-import org.apache.commons.codec.binary.Hex;
-import org.apache.pinot.common.Utils;
+import org.apache.pinot.common.utils.BytesUtils;
 import org.apache.pinot.common.utils.primitive.ByteArray;
 import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
 import org.apache.pinot.core.io.writer.impl.MutableOffHeapByteArrayStore;
@@ -56,30 +54,23 @@ public class BytesOffHeapMutableDictionary extends BaseOffHeapMutableDictionary
 
   @Override
   public int indexOf(Object rawValue) {
-    byte[] bytes = null;
-    // Convert hex string to byte[].
-    if (rawValue instanceof byte[]) {
-      bytes = (byte[]) rawValue;
-    } else if (rawValue instanceof String) {
-      try {
-        bytes = Hex.decodeHex(((String) rawValue).toCharArray());
-      } catch (DecoderException e) {
-        Utils.rethrowException(e);
-      }
-    } else {
-      assert rawValue instanceof byte[];
-    }
+    byte[] bytes = BytesUtils.toBytes(rawValue);
     return getDictId(new ByteArray(bytes), bytes);
   }
 
   @Override
   public byte[] get(int dictId) {
-    return _byteStore.get(dictId);
+    return getBytesValue(dictId);
+  }
+
+  @Override
+  public String getStringValue(int dictId) {
+    return BytesUtils.toHexString(getBytesValue(dictId));
   }
 
   @Override
   public byte[] getBytesValue(int dictId) {
-    return get(dictId);
+    return _byteStore.get(dictId);
   }
 
   @Override
@@ -95,18 +86,7 @@ public class BytesOffHeapMutableDictionary extends BaseOffHeapMutableDictionary
 
   @Override
   public void index(@Nonnull Object rawValue) {
-    byte[] bytes = null;
-    // Convert hex string to byte[].
-    if (rawValue instanceof String) {
-      try {
-        bytes = Hex.decodeHex(((String) rawValue).toCharArray());
-      } catch (DecoderException e) {
-        Utils.rethrowException(e);
-      }
-    } else {
-      assert rawValue instanceof byte[];
-      bytes = (byte[]) rawValue;
-    }
+    byte[] bytes = BytesUtils.toBytes(rawValue);
     ByteArray byteArray = new ByteArray(bytes);
     indexValue(byteArray, bytes);
     updateMinMax(byteArray);
@@ -115,7 +95,8 @@ public class BytesOffHeapMutableDictionary extends BaseOffHeapMutableDictionary
   @Override
   public boolean inRange(@Nonnull String lower, @Nonnull String upper, int dictIdToCompare, boolean includeLower,
       boolean includeUpper) {
-    throw new UnsupportedOperationException("In-range not supported for Bytes data type.");
+    return valueInRange(new ByteArray(BytesUtils.toBytes(lower)), new ByteArray(BytesUtils.toBytes(upper)),
+        includeLower, includeUpper, new ByteArray(getBytesValue(dictIdToCompare)));
   }
 
   @Nonnull
@@ -174,4 +155,9 @@ public class BytesOffHeapMutableDictionary extends BaseOffHeapMutableDictionary
   public int getAvgValueSize() {
     return (int) _byteStore.getAvgValueSize();
   }
+
+  @Override
+  public int compare(int dictId1, int dictId2) {
+    return ByteArray.compare(getBytesValue(dictId1), getBytesValue(dictId2));
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/BytesOnHeapMutableDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/BytesOnHeapMutableDictionary.java
index 37236c3..3e223a0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/BytesOnHeapMutableDictionary.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/BytesOnHeapMutableDictionary.java
@@ -20,9 +20,7 @@ package org.apache.pinot.core.realtime.impl.dictionary;
 
 import java.util.Arrays;
 import javax.annotation.Nonnull;
-import org.apache.commons.codec.DecoderException;
-import org.apache.commons.codec.binary.Hex;
-import org.apache.pinot.common.Utils;
+import org.apache.pinot.common.utils.BytesUtils;
 import org.apache.pinot.common.utils.primitive.ByteArray;
 
 
@@ -36,46 +34,28 @@ public class BytesOnHeapMutableDictionary extends BaseOnHeapMutableDictionary {
 
   @Override
   public int indexOf(Object rawValue) {
-    byte[] bytes = null;
-    // Convert hex string to byte[].
-    if (rawValue instanceof byte[]) {
-      bytes = (byte[]) rawValue;
-    } else if (rawValue instanceof String) {
-      try {
-        bytes = Hex.decodeHex(((String) rawValue).toCharArray());
-      } catch (DecoderException e) {
-        Utils.rethrowException(e);
-      }
-    } else {
-      assert rawValue instanceof byte[];
-    }
+    byte[] bytes = BytesUtils.toBytes(rawValue);
     return getDictId(new ByteArray(bytes));
   }
 
   @Override
   public byte[] get(int dictId) {
-    return ((ByteArray) super.get(dictId)).getBytes();
+    return getBytesValue(dictId);
+  }
+
+  @Override
+  public String getStringValue(int dictId) {
+    return BytesUtils.toHexString(getBytesValue(dictId));
   }
 
   @Override
   public byte[] getBytesValue(int dictId) {
-    return get(dictId);
+    return ((ByteArray) super.get(dictId)).getBytes();
   }
 
   @Override
   public void index(@Nonnull Object rawValue) {
-    byte[] bytes = null;
-    // Convert hex string to byte[].
-    if (rawValue instanceof String) {
-      try {
-        bytes = Hex.decodeHex(((String) rawValue).toCharArray());
-      } catch (DecoderException e) {
-        Utils.rethrowException(e);
-      }
-    } else {
-      assert rawValue instanceof byte[];
-      bytes = (byte[]) rawValue;
-    }
+    byte[] bytes = BytesUtils.toBytes(rawValue);
     ByteArray byteArray = new ByteArray(bytes);
     indexValue(byteArray);
     updateMinMax(byteArray);
@@ -84,7 +64,8 @@ public class BytesOnHeapMutableDictionary extends BaseOnHeapMutableDictionary {
   @Override
   public boolean inRange(@Nonnull String lower, @Nonnull String upper, int dictIdToCompare, boolean includeLower,
       boolean includeUpper) {
-    throw new UnsupportedOperationException("In-range not supported for Bytes data type.");
+    return valueInRange(new ByteArray(BytesUtils.toBytes(lower)), new ByteArray(BytesUtils.toBytes(upper)),
+        includeLower, includeUpper, (ByteArray) super.get(dictIdToCompare));
   }
 
   @Nonnull
@@ -113,6 +94,11 @@ public class BytesOnHeapMutableDictionary extends BaseOnHeapMutableDictionary {
     return sortedValues;
   }
 
+  @Override
+  public int compare(int dictId1, int dictId2) {
+    return ByteArray.compare(getBytesValue(dictId1), getBytesValue(dictId2));
+  }
+
   private void updateMinMax(ByteArray value) {
     if (_min == null) {
       _min = value;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/DoubleOffHeapMutableDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/DoubleOffHeapMutableDictionary.java
index febc672..6381e96 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/DoubleOffHeapMutableDictionary.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/DoubleOffHeapMutableDictionary.java
@@ -39,8 +39,28 @@ public class DoubleOffHeapMutableDictionary extends BaseOffHeapMutableDictionary
         allocationContext);
   }
 
-  public Object get(int dictionaryId) {
-    return _dictIdToValue.getDouble(dictionaryId);
+  public Double get(int dictId) {
+    return getDoubleValue(dictId);
+  }
+
+  @Override
+  public int getIntValue(int dictId) {
+    return (int) getDoubleValue(dictId);
+  }
+
+  @Override
+  public long getLongValue(int dictId) {
+    return (long) getDoubleValue(dictId);
+  }
+
+  @Override
+  public float getFloatValue(int dictId) {
+    return (float) getDoubleValue(dictId);
+  }
+
+  @Override
+  public double getDoubleValue(int dictId) {
+    return _dictIdToValue.getDouble(dictId);
   }
 
   @Override
@@ -132,28 +152,13 @@ public class DoubleOffHeapMutableDictionary extends BaseOffHeapMutableDictionary
   }
 
   @Override
-  protected void setRawValueAt(int dictId, Object value, byte[] serializedValue) {
-    _dictIdToValue.setDouble(dictId, (Double) value);
-  }
-
-  @Override
-  public int getIntValue(int dictId) {
-    return ((Double) get(dictId)).intValue();
-  }
-
-  @Override
-  public long getLongValue(int dictId) {
-    return ((Double) get(dictId)).longValue();
-  }
-
-  @Override
-  public float getFloatValue(int dictId) {
-    return ((Double) get(dictId)).floatValue();
+  public int compare(int dictId1, int dictId2) {
+    return Double.compare(getDoubleValue(dictId1), getDoubleValue(dictId2));
   }
 
   @Override
-  public double getDoubleValue(int dictId) {
-    return (Double) get(dictId);
+  protected void setRawValueAt(int dictId, Object value, byte[] serializedValue) {
+    _dictIdToValue.setDouble(dictId, (Double) value);
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/DoubleOnHeapMutableDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/DoubleOnHeapMutableDictionary.java
index f9b5f2c..f672c74 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/DoubleOnHeapMutableDictionary.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/DoubleOnHeapMutableDictionary.java
@@ -109,6 +109,11 @@ public class DoubleOnHeapMutableDictionary extends BaseOnHeapMutableDictionary {
   }
 
   @Override
+  public int compare(int dictId1, int dictId2) {
+    return Double.compare(getDoubleValue(dictId1), getDoubleValue(dictId2));
+  }
+
+  @Override
   public int getIntValue(int dictId) {
     return ((Double) get(dictId)).intValue();
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/FloatOffHeapMutableDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/FloatOffHeapMutableDictionary.java
index 6c1ccfd..89dfcf5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/FloatOffHeapMutableDictionary.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/FloatOffHeapMutableDictionary.java
@@ -39,8 +39,29 @@ public class FloatOffHeapMutableDictionary extends BaseOffHeapMutableDictionary
         allocationContext);
   }
 
-  public Object get(int dictionaryId) {
-    return _dictIdToValue.getFloat(dictionaryId);
+  @Override
+  public Float get(int dictId) {
+    return getFloatValue(dictId);
+  }
+
+  @Override
+  public int getIntValue(int dictId) {
+    return (int) getDoubleValue(dictId);
+  }
+
+  @Override
+  public long getLongValue(int dictId) {
+    return (long) getDoubleValue(dictId);
+  }
+
+  @Override
+  public float getFloatValue(int dictId) {
+    return _dictIdToValue.getFloat(dictId);
+  }
+
+  @Override
+  public double getDoubleValue(int dictId) {
+    return getFloatValue(dictId);
   }
 
   @Override
@@ -132,28 +153,13 @@ public class FloatOffHeapMutableDictionary extends BaseOffHeapMutableDictionary
   }
 
   @Override
-  protected void setRawValueAt(int dictId, Object value, byte[] serializedValue) {
-    _dictIdToValue.setFloat(dictId, (Float) value);
+  public int compare(int dictId1, int dictId2) {
+    return Float.compare(getFloatValue(dictId1), getFloatValue(dictId2));
   }
 
   @Override
-  public int getIntValue(int dictId) {
-    return ((Float) get(dictId)).intValue();
-  }
-
-  @Override
-  public long getLongValue(int dictId) {
-    return ((Float) get(dictId)).longValue();
-  }
-
-  @Override
-  public float getFloatValue(int dictId) {
-    return (Float) get(dictId);
-  }
-
-  @Override
-  public double getDoubleValue(int dictId) {
-    return ((Float) get(dictId)).doubleValue();
+  protected void setRawValueAt(int dictId, Object value, byte[] serializedValue) {
+    _dictIdToValue.setFloat(dictId, (Float) value);
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/FloatOnHeapMutableDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/FloatOnHeapMutableDictionary.java
index 5db2d90..ac8d124 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/FloatOnHeapMutableDictionary.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/FloatOnHeapMutableDictionary.java
@@ -109,6 +109,11 @@ public class FloatOnHeapMutableDictionary extends BaseOnHeapMutableDictionary {
   }
 
   @Override
+  public int compare(int dictId1, int dictId2) {
+    return Float.compare(getFloatValue(dictId1), getFloatValue(dictId2));
+  }
+
+  @Override
   public int getIntValue(int dictId) {
     return ((Float) get(dictId)).intValue();
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/IntOffHeapMutableDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/IntOffHeapMutableDictionary.java
index 24d328d..a0bbad8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/IntOffHeapMutableDictionary.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/IntOffHeapMutableDictionary.java
@@ -39,8 +39,29 @@ public class IntOffHeapMutableDictionary extends BaseOffHeapMutableDictionary {
         allocationContext);
   }
 
-  public Object get(int dictionaryId) {
-    return _dictIdToValue.getInt(dictionaryId);
+  @Override
+  public Integer get(int dictId) {
+    return getIntValue(dictId);
+  }
+
+  @Override
+  public int getIntValue(int dictId) {
+    return _dictIdToValue.getInt(dictId);
+  }
+
+  @Override
+  public long getLongValue(int dictId) {
+    return getIntValue(dictId);
+  }
+
+  @Override
+  public float getFloatValue(int dictId) {
+    return getIntValue(dictId);
+  }
+
+  @Override
+  public double getDoubleValue(int dictId) {
+    return getIntValue(dictId);
   }
 
   @Override
@@ -132,28 +153,13 @@ public class IntOffHeapMutableDictionary extends BaseOffHeapMutableDictionary {
   }
 
   @Override
-  protected void setRawValueAt(int dictId, Object value, byte[] serializedValue) {
-    _dictIdToValue.setInt(dictId, (Integer) value);
+  public int compare(int dictId1, int dictId2) {
+    return Integer.compare(getIntValue(dictId1), getIntValue(dictId2));
   }
 
   @Override
-  public int getIntValue(int dictId) {
-    return (Integer) get(dictId);
-  }
-
-  @Override
-  public long getLongValue(int dictId) {
-    return (Integer) get(dictId);
-  }
-
-  @Override
-  public float getFloatValue(int dictId) {
-    return (Integer) get(dictId);
-  }
-
-  @Override
-  public double getDoubleValue(int dictId) {
-    return (Integer) get(dictId);
+  protected void setRawValueAt(int dictId, Object value, byte[] serializedValue) {
+    _dictIdToValue.setInt(dictId, (Integer) value);
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/IntOnHeapMutableDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/IntOnHeapMutableDictionary.java
index 5b39570..5ae074d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/IntOnHeapMutableDictionary.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/IntOnHeapMutableDictionary.java
@@ -109,6 +109,11 @@ public class IntOnHeapMutableDictionary extends BaseOnHeapMutableDictionary {
   }
 
   @Override
+  public int compare(int dictId1, int dictId2) {
+    return Integer.compare(getIntValue(dictId1), getIntValue(dictId2));
+  }
+
+  @Override
   public int getIntValue(int dictId) {
     return (Integer) get(dictId);
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/LongOffHeapMutableDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/LongOffHeapMutableDictionary.java
index d2cfc77..997ced0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/LongOffHeapMutableDictionary.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/LongOffHeapMutableDictionary.java
@@ -39,8 +39,29 @@ public class LongOffHeapMutableDictionary extends BaseOffHeapMutableDictionary {
         allocationContext);
   }
 
-  public Object get(int dictionaryId) {
-    return _dictIdToValue.getLong(dictionaryId);
+  @Override
+  public Long get(int dictId) {
+    return getLongValue(dictId);
+  }
+
+  @Override
+  public int getIntValue(int dictId) {
+    return (int) getLongValue(dictId);
+  }
+
+  @Override
+  public long getLongValue(int dictId) {
+    return _dictIdToValue.getLong(dictId);
+  }
+
+  @Override
+  public float getFloatValue(int dictId) {
+    return getLongValue(dictId);
+  }
+
+  @Override
+  public double getDoubleValue(int dictId) {
+    return getLongValue(dictId);
   }
 
   @Override
@@ -132,28 +153,13 @@ public class LongOffHeapMutableDictionary extends BaseOffHeapMutableDictionary {
   }
 
   @Override
-  protected void setRawValueAt(int dictId, Object value, byte[] serializedValue) {
-    _dictIdToValue.setLong(dictId, (Long) value);
+  public int compare(int dictId1, int dictId2) {
+    return Long.compare(getLongValue(dictId1), getLongValue(dictId2));
   }
 
   @Override
-  public int getIntValue(int dictId) {
-    return ((Long) get(dictId)).intValue();
-  }
-
-  @Override
-  public long getLongValue(int dictId) {
-    return (Long) get(dictId);
-  }
-
-  @Override
-  public float getFloatValue(int dictId) {
-    return ((Long) get(dictId)).floatValue();
-  }
-
-  @Override
-  public double getDoubleValue(int dictId) {
-    return ((Long) get(dictId)).doubleValue();
+  protected void setRawValueAt(int dictId, Object value, byte[] serializedValue) {
+    _dictIdToValue.setLong(dictId, (Long) value);
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/LongOnHeapMutableDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/LongOnHeapMutableDictionary.java
index 1ffd887..3805c85 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/LongOnHeapMutableDictionary.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/LongOnHeapMutableDictionary.java
@@ -109,6 +109,11 @@ public class LongOnHeapMutableDictionary extends BaseOnHeapMutableDictionary {
   }
 
   @Override
+  public int compare(int dictId1, int dictId2) {
+    return Long.compare(getLongValue(dictId1), getLongValue(dictId2));
+  }
+
+  @Override
   public int getIntValue(int dictId) {
     return ((Long) get(dictId)).intValue();
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/MutableDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/MutableDictionary.java
index 99d831c..1f9855b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/MutableDictionary.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/MutableDictionary.java
@@ -51,6 +51,11 @@ public abstract class MutableDictionary extends BaseDictionary {
   public abstract boolean isEmpty();
 
   /**
+   * Returns the comparison result of value for dict Id 1 and dict Id 2, i.e. {@code value1.compareTo(value2)}.
+   */
+  public abstract int compare(int dictId1, int dictId2);
+
+  /**
    * Helper method to identify if given (Comparable) value is in provided range.
    *
    * @param lower Lower value of range
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/StringOffHeapMutableDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/StringOffHeapMutableDictionary.java
index a318f1e..3fc236d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/StringOffHeapMutableDictionary.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/StringOffHeapMutableDictionary.java
@@ -49,8 +49,8 @@ public class StringOffHeapMutableDictionary extends BaseOffHeapMutableDictionary
   }
 
   @Override
-  public Object get(int dictionaryId) {
-    return StringUtil.decodeUtf8(_byteStore.get(dictionaryId));
+  public String get(int dictId) {
+    return StringUtil.decodeUtf8(_byteStore.get(dictId));
   }
 
   @Override
@@ -90,19 +90,19 @@ public class StringOffHeapMutableDictionary extends BaseOffHeapMutableDictionary
 
   @Nonnull
   @Override
-  public Object getMinVal() {
+  public String getMinVal() {
     return _min;
   }
 
   @Nonnull
   @Override
-  public Object getMaxVal() {
+  public String getMaxVal() {
     return _max;
   }
 
   @Nonnull
   @Override
-  public Object getSortedValues() {
+  public String[] getSortedValues() {
     int numValues = length();
     String[] sortedValues = new String[numValues];
 
@@ -141,4 +141,9 @@ public class StringOffHeapMutableDictionary extends BaseOffHeapMutableDictionary
   public int getAvgValueSize() {
     return (int) _byteStore.getAvgValueSize();
   }
+
+  @Override
+  public int compare(int dictId1, int dictId2) {
+    return getStringValue(dictId1).compareTo(getStringValue(dictId2));
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/StringOnHeapMutableDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/StringOnHeapMutableDictionary.java
index 7e50e33..8f2e183 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/StringOnHeapMutableDictionary.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/dictionary/StringOnHeapMutableDictionary.java
@@ -81,23 +81,8 @@ public class StringOnHeapMutableDictionary extends BaseOnHeapMutableDictionary {
   }
 
   @Override
-  public int getIntValue(int dictId) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public long getLongValue(int dictId) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public float getFloatValue(int dictId) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public double getDoubleValue(int dictId) {
-    throw new UnsupportedOperationException();
+  public int compare(int dictId1, int dictId2) {
+    return getStringValue(dictId1).compareTo(getStringValue(dictId2));
   }
 
   private void updateMinMax(String value) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BaseDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BaseDictionary.java
index 41f92ea..b2dc426 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BaseDictionary.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BaseDictionary.java
@@ -51,7 +51,6 @@ public abstract class BaseDictionary implements Dictionary {
   }
 
   @Override
-
   public byte[] getBytesValue(int dictId) {
     throw new UnsupportedOperationException();
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BytesDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BytesDictionary.java
index 7ce3500..1f35f80 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BytesDictionary.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BytesDictionary.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.segment.index.readers;
 
+import org.apache.pinot.common.utils.BytesUtils;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
 
 
@@ -38,12 +39,17 @@ public class BytesDictionary extends ImmutableDictionaryReader {
 
   @Override
   public int insertionIndexOf(Object rawValue) {
-    return binarySearch((byte[]) rawValue);
+    return binarySearch(BytesUtils.toBytes(rawValue));
   }
 
   @Override
   public byte[] get(int dictId) {
-    return getBytes(dictId, getBuffer());
+    return getBytesValue(dictId);
+  }
+
+  @Override
+  public String getStringValue(int dictId) {
+    return BytesUtils.toHexString(getBytesValue(dictId));
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/FixedIntArrayOffHeapIdMap.java b/pinot-core/src/main/java/org/apache/pinot/core/util/FixedIntArrayOffHeapIdMap.java
index 26cd9ea..d73a515 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/FixedIntArrayOffHeapIdMap.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/FixedIntArrayOffHeapIdMap.java
@@ -83,7 +83,8 @@ public class FixedIntArrayOffHeapIdMap extends BaseOffHeapMutableDictionary impl
     }
   }
 
-  public Object get(int dictId) {
+  @Override
+  public FixedIntArray get(int dictId) {
     int[] value = new int[_numColumns];
     for (int col = 0; col < _numColumns; col++) {
       value[col] = _dictIdToValue.getInt(dictId, col);
@@ -145,4 +146,9 @@ public class FixedIntArrayOffHeapIdMap extends BaseOffHeapMutableDictionary impl
   public int getAvgValueSize() {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public int compare(int dictId1, int dictId2) {
+    throw new UnsupportedOperationException();
+  }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/dictionary/MutableDictionaryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/dictionary/MutableDictionaryTest.java
index 99a28d2..fd9eaa3 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/dictionary/MutableDictionaryTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/dictionary/MutableDictionaryTest.java
@@ -30,7 +30,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
-import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.common.utils.primitive.ByteArray;
@@ -176,7 +175,7 @@ public class MutableDictionaryTest {
           (i == 0 && dataType == FieldSpec.DataType.INT) ? Integer.MIN_VALUE : makeRandomObjectOfType(dataType);
 
       Object rawValue = dataType == FieldSpec.DataType.BYTES ? ((i % 2 == 0) ? ((ByteArray) value).getBytes()
-          : Hex.encodeHexString(((ByteArray) value).getBytes())) : value;
+          : ((ByteArray) value).toHexString()) : value;
       if (valueToDictId.containsKey(value)) {
         Assert.assertEquals(dictionary.indexOf(rawValue), (int) valueToDictId.get(value));
       } else {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/readers/ImmutableDictionaryReaderTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/readers/ImmutableDictionaryReaderTest.java
index 4f007bb..695b0e1 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/readers/ImmutableDictionaryReaderTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/readers/ImmutableDictionaryReaderTest.java
@@ -278,6 +278,7 @@ public class ImmutableDictionaryReaderTest {
         assertEquals(new ByteArray(bytesDictionary.getBytesValue(i)), _bytesValues[i]);
 
         assertEquals(bytesDictionary.indexOf(_bytesValues[i].getBytes()), i);
+        assertEquals(bytesDictionary.indexOf(_bytesValues[i].toHexString()), i);
 
         byte[] randomBytes = new byte[BYTES_LENGTH];
         RANDOM.nextBytes(randomBytes);
diff --git a/pinot-core/src/test/java/org/apache/pinot/segments/v1/creator/DictionariesTest.java b/pinot-core/src/test/java/org/apache/pinot/segments/v1/creator/DictionariesTest.java
index c39b721..468b922 100644
--- a/pinot-core/src/test/java/org/apache/pinot/segments/v1/creator/DictionariesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/segments/v1/creator/DictionariesTest.java
@@ -37,6 +37,7 @@ import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.common.data.FieldSpec.DataType;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.common.utils.primitive.ByteArray;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
@@ -46,6 +47,7 @@ import org.apache.pinot.core.segment.creator.impl.SegmentCreationDriverFactory;
 import org.apache.pinot.core.segment.creator.impl.SegmentDictionaryCreator;
 import org.apache.pinot.core.segment.creator.impl.V1Constants;
 import org.apache.pinot.core.segment.creator.impl.stats.AbstractColumnStatisticsCollector;
+import org.apache.pinot.core.segment.creator.impl.stats.BytesColumnPredIndexStatsCollector;
 import org.apache.pinot.core.segment.creator.impl.stats.DoubleColumnPreIndexStatsCollector;
 import org.apache.pinot.core.segment.creator.impl.stats.FloatColumnPreIndexStatsCollector;
 import org.apache.pinot.core.segment.creator.impl.stats.IntColumnPreIndexStatsCollector;
@@ -371,6 +373,32 @@ public class DictionariesTest {
     Assert.assertFalse(statsCollector.isSorted());
   }
 
+  @Test
+  public void testBytesColumnPreIndexStatsCollector() {
+    AbstractColumnStatisticsCollector statsCollector = buildStatsCollector("column1", DataType.BYTES);
+    statsCollector.collect(new byte[]{1});
+    Assert.assertTrue(statsCollector.isSorted());
+    statsCollector.collect(new byte[]{1});
+    Assert.assertTrue(statsCollector.isSorted());
+    statsCollector.collect(new byte[]{1, 2});
+    Assert.assertTrue(statsCollector.isSorted());
+    statsCollector.collect(new byte[]{1, 2, 3});
+    Assert.assertTrue(statsCollector.isSorted());
+    statsCollector.collect(new byte[]{1, 2, 3, 4});
+    Assert.assertTrue(statsCollector.isSorted());
+    statsCollector.collect(new byte[]{0});
+    Assert.assertFalse(statsCollector.isSorted());
+    statsCollector.collect(new byte[]{0, 1});
+    Assert.assertFalse(statsCollector.isSorted());
+    statsCollector.collect(new byte[]{1});
+    Assert.assertFalse(statsCollector.isSorted());
+    statsCollector.seal();
+    Assert.assertEquals(statsCollector.getCardinality(), 6);
+    Assert.assertEquals(statsCollector.getMinValue(), new ByteArray(new byte[]{0}));
+    Assert.assertEquals(statsCollector.getMaxValue(), new ByteArray(new byte[]{1, 2, 3, 4}));
+    Assert.assertFalse(statsCollector.isSorted());
+  }
+
   /**
    * Test for ensuring that Strings with special characters can be handled
    * correctly.
@@ -437,22 +465,17 @@ public class DictionariesTest {
     switch (dataType) {
       case INT:
         return new IntColumnPreIndexStatsCollector(column, statsCollectorConfig);
-
       case LONG:
         return new LongColumnPreIndexStatsCollector(column, statsCollectorConfig);
-
       case FLOAT:
         return new FloatColumnPreIndexStatsCollector(column, statsCollectorConfig);
-
       case DOUBLE:
         return new DoubleColumnPreIndexStatsCollector(column, statsCollectorConfig);
-
-      case STRING:
-        return new StringColumnPreIndexStatsCollector(column, statsCollectorConfig);
-
       case BOOLEAN:
+      case STRING:
         return new StringColumnPreIndexStatsCollector(column, statsCollectorConfig);
-
+      case BYTES:
+        return new BytesColumnPredIndexStatsCollector(column, statsCollectorConfig);
       default:
         throw new IllegalArgumentException("Illegal data type for stats builder: " + dataType);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org