You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/12/20 00:41:18 UTC

(pinot) branch master updated: Fix partition handling by always using string values (#12115)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c706fd04e5 Fix partition handling by always using string values (#12115)
c706fd04e5 is described below

commit c706fd04e57c8617e168619e3620da74efff7ea1
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Dec 19 16:41:13 2023 -0800

    Fix partition handling by always using string values (#12115)
---
 .../MultiPartitionColumnsSegmentPruner.java        |   5 +-
 .../SinglePartitionColumnSegmentPruner.java        |   5 +-
 .../request/context/RequestContextUtils.java       |  35 ++-
 .../query/pruner/ColumnValueSegmentPruner.java     |   5 +-
 .../core/query/pruner/ValueBasedSegmentPruner.java |  11 +-
 .../partitioner/TableConfigPartitioner.java        |   3 +-
 .../indexsegment/mutable/MutableSegmentImpl.java   |   6 +-
 .../stats/AbstractColumnStatisticsCollector.java   |  13 +-
 .../BigDecimalColumnPreIndexStatsCollector.java    |   4 +-
 .../stats/BytesColumnPredIndexStatsCollector.java  |   4 +-
 .../stats/DoubleColumnPreIndexStatsCollector.java  |   4 +-
 .../stats/FloatColumnPreIndexStatsCollector.java   |   4 +-
 .../stats/IntColumnPreIndexStatsCollector.java     |   4 +-
 .../stats/LongColumnPreIndexStatsCollector.java    |   4 +-
 .../stats/StringColumnPreIndexStatsCollector.java  |   4 +-
 .../BoundedColumnValuePartitionFunction.java       |   4 +-
 .../spi/partition/ByteArrayPartitionFunction.java  |   4 +-
 .../spi/partition/HashCodePartitionFunction.java   |   4 +-
 .../spi/partition/ModuloPartitionFunction.java     |  15 +-
 .../spi/partition/Murmur3PartitionFunction.java    | 234 +++--------------
 .../spi/partition/MurmurPartitionFunction.java     |   6 +-
 .../segment/spi/partition/PartitionFunction.java   |   3 +-
 .../spi/partition/PartitionFunctionFactory.java    |   3 +-
 .../spi/partition/PartitionFunctionTest.java       | 277 ++++-----------------
 .../java/org/apache/pinot/spi/data/FieldSpec.java  |  21 +-
 25 files changed, 194 insertions(+), 488 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/MultiPartitionColumnsSegmentPruner.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/MultiPartitionColumnsSegmentPruner.java
index b05ab4e55b..6970d3f54e 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/MultiPartitionColumnsSegmentPruner.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/MultiPartitionColumnsSegmentPruner.java
@@ -34,6 +34,7 @@ import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.Expression;
 import org.apache.pinot.common.request.Function;
 import org.apache.pinot.common.request.Identifier;
+import org.apache.pinot.common.request.context.RequestContextUtils;
 import org.apache.pinot.sql.FilterKind;
 
 
@@ -140,7 +141,7 @@ public class MultiPartitionColumnsSegmentPruner implements SegmentPruner {
         if (identifier != null) {
           SegmentPartitionInfo partitionInfo = columnPartitionInfoMap.get(identifier.getName());
           return partitionInfo == null || partitionInfo.getPartitions().contains(
-              partitionInfo.getPartitionFunction().getPartition(operands.get(1).getLiteral().getFieldValue()));
+              partitionInfo.getPartitionFunction().getPartition(RequestContextUtils.getStringValue(operands.get(1))));
         } else {
           return true;
         }
@@ -155,7 +156,7 @@ public class MultiPartitionColumnsSegmentPruner implements SegmentPruner {
           int numOperands = operands.size();
           for (int i = 1; i < numOperands; i++) {
             if (partitionInfo.getPartitions().contains(partitionInfo.getPartitionFunction()
-                .getPartition(operands.get(i).getLiteral().getFieldValue().toString()))) {
+                .getPartition(RequestContextUtils.getStringValue(operands.get(i))))) {
               return true;
             }
           }
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SinglePartitionColumnSegmentPruner.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SinglePartitionColumnSegmentPruner.java
index 80ad4b7e3e..7016484e48 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SinglePartitionColumnSegmentPruner.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SinglePartitionColumnSegmentPruner.java
@@ -33,6 +33,7 @@ import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.Expression;
 import org.apache.pinot.common.request.Function;
 import org.apache.pinot.common.request.Identifier;
+import org.apache.pinot.common.request.context.RequestContextUtils;
 import org.apache.pinot.sql.FilterKind;
 
 
@@ -129,7 +130,7 @@ public class SinglePartitionColumnSegmentPruner implements SegmentPruner {
         Identifier identifier = operands.get(0).getIdentifier();
         if (identifier != null && identifier.getName().equals(_partitionColumn)) {
           return partitionInfo.getPartitions().contains(partitionInfo.getPartitionFunction()
-              .getPartition(operands.get(1).getLiteral().getFieldValue().toString()));
+              .getPartition(RequestContextUtils.getStringValue(operands.get(1))));
         } else {
           return true;
         }
@@ -140,7 +141,7 @@ public class SinglePartitionColumnSegmentPruner implements SegmentPruner {
           int numOperands = operands.size();
           for (int i = 1; i < numOperands; i++) {
             if (partitionInfo.getPartitions().contains(partitionInfo.getPartitionFunction()
-                .getPartition(operands.get(i).getLiteral().getFieldValue().toString()))) {
+                .getPartition(RequestContextUtils.getStringValue(operands.get(i))))) {
               return true;
             }
           }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/request/context/RequestContextUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/request/context/RequestContextUtils.java
index 0369b29f59..28f3037b25 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/request/context/RequestContextUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/request/context/RequestContextUtils.java
@@ -42,6 +42,8 @@ import org.apache.pinot.common.utils.RegexpPatternConverterUtils;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
+import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.sql.FilterKind;
 import org.apache.pinot.sql.parsers.CalciteSqlParser;
 
@@ -259,15 +261,38 @@ public class RequestContextUtils {
     }
   }
 
-  private static String getStringValue(Expression thriftExpression) {
-    if (thriftExpression.getType() != ExpressionType.LITERAL) {
+  public static String getStringValue(Expression thriftExpression) {
+    Literal literal = thriftExpression.getLiteral();
+    if (literal == null) {
       throw new BadQueryRequestException(
           "Pinot does not support column or function on the right-hand side of the predicate");
     }
-    if (thriftExpression.getLiteral().getSetField() == Literal._Fields.NULL_VALUE) {
-      return "null";
+    switch (literal.getSetField()) {
+      case BOOL_VALUE:
+        return Boolean.toString(literal.getBoolValue());
+      case BYTE_VALUE:
+        return Byte.toString(literal.getByteValue());
+      case SHORT_VALUE:
+        return Short.toString(literal.getShortValue());
+      case INT_VALUE:
+        return Integer.toString(literal.getIntValue());
+      case LONG_VALUE:
+        return Long.toString(literal.getLongValue());
+      case FLOAT_VALUE:
+        return Float.toString(literal.getFloatValue());
+      case DOUBLE_VALUE:
+        return Double.toString(literal.getDoubleValue());
+      case BIG_DECIMAL_VALUE:
+        return BigDecimalUtils.deserialize(literal.getBigDecimalValue()).toPlainString();
+      case STRING_VALUE:
+        return literal.getStringValue();
+      case BINARY_VALUE:
+        return BytesUtils.toHexString(literal.getBinaryValue());
+      case NULL_VALUE:
+        return "null";
+      default:
+        throw new IllegalStateException("Unsupported literal type: " + literal.getSetField());
     }
-    return thriftExpression.getLiteral().getFieldValue().toString();
   }
 
   /**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
index a2b949da9e..8eead9f552 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
@@ -107,9 +107,8 @@ public class ColumnValueSegmentPruner extends ValueBasedSegmentPruner {
     assert dataSource != null;
     DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
     ValueCache.CachedValue cachedValue = valueCache.get(eqPredicate, dataSourceMetadata.getDataType());
-    Comparable value = cachedValue.getComparableValue();
     // Check min/max value
-    if (!checkMinMaxRange(dataSourceMetadata, value)) {
+    if (!checkMinMaxRange(dataSourceMetadata, cachedValue.getComparableValue())) {
       return true;
     }
     // Check column partition
@@ -117,7 +116,7 @@ public class ColumnValueSegmentPruner extends ValueBasedSegmentPruner {
     if (partitionFunction != null) {
       Set<Integer> partitions = dataSourceMetadata.getPartitions();
       assert partitions != null;
-      if (!partitions.contains(partitionFunction.getPartition(value))) {
+      if (!partitions.contains(partitionFunction.getPartition(cachedValue.getValue()))) {
         return true;
       }
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ValueBasedSegmentPruner.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ValueBasedSegmentPruner.java
index 76f1b2737b..2e3214c8c2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ValueBasedSegmentPruner.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ValueBasedSegmentPruner.java
@@ -200,17 +200,21 @@ abstract public class ValueBasedSegmentPruner implements SegmentPruner {
     }
 
     public static class CachedValue {
-      private final Object _value;
+      private final String _value;
       private boolean _hashed = false;
       private long _hash1;
       private long _hash2;
       private DataType _dt;
       private Comparable _comparableValue;
 
-      private CachedValue(Object value) {
+      private CachedValue(String value) {
         _value = value;
       }
 
+      public String getValue() {
+        return _value;
+      }
+
       public Comparable getComparableValue() {
         assert _dt != null;
         return _comparableValue;
@@ -218,9 +222,8 @@ abstract public class ValueBasedSegmentPruner implements SegmentPruner {
 
       public void ensureDataType(DataType dt) {
         if (dt != _dt) {
-          String strValue = _value.toString();
           _dt = dt;
-          _comparableValue = convertValue(strValue, dt);
+          _comparableValue = convertValue(_value, dt);
           _hashed = false;
         }
       }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/partitioner/TableConfigPartitioner.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/partitioner/TableConfigPartitioner.java
index 4a088f129d..cff7f3b745 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/partitioner/TableConfigPartitioner.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/partitioner/TableConfigPartitioner.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.segment.processing.partitioner;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.readers.GenericRow;
 
 
@@ -41,6 +42,6 @@ public class TableConfigPartitioner implements Partitioner {
 
   @Override
   public String getPartition(GenericRow genericRow) {
-    return String.valueOf(_partitionFunction.getPartition(genericRow.getValue(_column)));
+    return String.valueOf(_partitionFunction.getPartition(FieldSpec.getStringValue(genericRow.getValue(_column))));
   }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 36f5d6e81c..55e0aec072 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -680,13 +680,13 @@ public class MutableSegmentImpl implements MutableSegment {
       if (fieldSpec.isSingleValueField()) {
         // Check partitions
         if (column.equals(_partitionColumn)) {
-          Object valueToPartition = (dataType == BYTES) ? new ByteArray((byte[]) value) : value;
-          int partition = _partitionFunction.getPartition(valueToPartition);
+          String stringValue = dataType.toString(value);
+          int partition = _partitionFunction.getPartition(stringValue);
           if (partition != _mainPartitionId) {
             if (indexContainer._partitions.add(partition)) {
               // for every partition other than mainPartitionId, log a warning once
               _logger.warn("Found new partition: {} from partition column: {}, value: {}", partition, column,
-                  valueToPartition);
+                  stringValue);
             }
             // always emit a metric when a partition other than mainPartitionId is detected
             if (_serverMetrics != null) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
index 72f8eabb74..5af1e14999 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
@@ -110,6 +110,7 @@ public abstract class AbstractColumnStatisticsCollector implements ColumnStatist
    * Returns the {@link PartitionFunction} for the column.
    * @return Partition function for the column.
    */
+  @Nullable
   public PartitionFunction getPartitionFunction() {
     return _partitionFunction;
   }
@@ -143,18 +144,20 @@ public abstract class AbstractColumnStatisticsCollector implements ColumnStatist
     return _partitions;
   }
 
+  protected boolean isPartitionEnabled() {
+    return _partitionFunction != null;
+  }
+
   /**
    * Updates the partition range based on the partition of the given value.
    *
    * @param value Column value.
    */
-  protected void updatePartition(Object value) {
-    if (_partitionFunction != null) {
-      _partitions.add(_partitionFunction.getPartition(value));
-    }
+  protected void updatePartition(String value) {
+    _partitions.add(_partitionFunction.getPartition(value));
   }
 
-  void updateTotalNumberOfEntries(Object[] entries) {
+  protected void updateTotalNumberOfEntries(Object[] entries) {
     _totalNumberOfEntries += entries.length;
   }
 
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BigDecimalColumnPreIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BigDecimalColumnPreIndexStatsCollector.java
index 1c4b7913ed..cb1fb992b3 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BigDecimalColumnPreIndexStatsCollector.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BigDecimalColumnPreIndexStatsCollector.java
@@ -51,7 +51,9 @@ public class BigDecimalColumnPreIndexStatsCollector extends AbstractColumnStatis
       int length = BigDecimalUtils.byteSize(value);
       addressSorted(value);
       if (_values.add(value)) {
-        updatePartition(value);
+        if (isPartitionEnabled()) {
+          updatePartition(value.toPlainString());
+        }
         _minLength = Math.min(_minLength, length);
         _maxLength = Math.max(_maxLength, length);
         _maxRowLength = _maxLength;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java
index 90732d1acc..34cb0ecc1e 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java
@@ -62,7 +62,9 @@ public class BytesColumnPredIndexStatsCollector extends AbstractColumnStatistics
       ByteArray value = new ByteArray((byte[]) entry);
       addressSorted(value);
       if (_values.add(value)) {
-        updatePartition(value);
+        if (isPartitionEnabled()) {
+          updatePartition(value.toString());
+        }
         int length = value.length();
         _minLength = Math.min(_minLength, length);
         _maxLength = Math.max(_maxLength, length);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/DoubleColumnPreIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/DoubleColumnPreIndexStatsCollector.java
index 2a80f353b8..cd8c9cf4b0 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/DoubleColumnPreIndexStatsCollector.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/DoubleColumnPreIndexStatsCollector.java
@@ -51,7 +51,9 @@ public class DoubleColumnPreIndexStatsCollector extends AbstractColumnStatistics
       double value = (double) entry;
       addressSorted(value);
       if (_values.add(value)) {
-        updatePartition(value);
+        if (isPartitionEnabled()) {
+          updatePartition(Double.toString(value));
+        }
       }
 
       _totalNumberOfEntries++;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/FloatColumnPreIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/FloatColumnPreIndexStatsCollector.java
index 09051c6517..fa117aa65e 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/FloatColumnPreIndexStatsCollector.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/FloatColumnPreIndexStatsCollector.java
@@ -51,7 +51,9 @@ public class FloatColumnPreIndexStatsCollector extends AbstractColumnStatisticsC
       float value = (float) entry;
       addressSorted(value);
       if (_values.add(value)) {
-        updatePartition(value);
+        if (isPartitionEnabled()) {
+          updatePartition(Float.toString(value));
+        }
       }
 
       _totalNumberOfEntries++;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/IntColumnPreIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/IntColumnPreIndexStatsCollector.java
index d65429e2d7..3675cdd9e4 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/IntColumnPreIndexStatsCollector.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/IntColumnPreIndexStatsCollector.java
@@ -51,7 +51,9 @@ public class IntColumnPreIndexStatsCollector extends AbstractColumnStatisticsCol
       int value = (int) entry;
       addressSorted(value);
       if (_values.add(value)) {
-        updatePartition(value);
+        if (isPartitionEnabled()) {
+          updatePartition(Integer.toString(value));
+        }
       }
 
       _totalNumberOfEntries++;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/LongColumnPreIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/LongColumnPreIndexStatsCollector.java
index 39cb4fa54b..5951444b87 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/LongColumnPreIndexStatsCollector.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/LongColumnPreIndexStatsCollector.java
@@ -51,7 +51,9 @@ public class LongColumnPreIndexStatsCollector extends AbstractColumnStatisticsCo
       long value = (long) entry;
       addressSorted(value);
       if (_values.add(value)) {
-        updatePartition(value);
+        if (isPartitionEnabled()) {
+          updatePartition(Long.toString(value));
+        }
       }
 
       _totalNumberOfEntries++;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
index 980fcd5ba5..b896ac30ff 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
@@ -62,7 +62,9 @@ public class StringColumnPreIndexStatsCollector extends AbstractColumnStatistics
       String value = (String) entry;
       addressSorted(value);
       if (_values.add(value)) {
-        updatePartition(value);
+        if (isPartitionEnabled()) {
+          updatePartition(value);
+        }
         int valueLength = value.getBytes(UTF_8).length;
         _minLength = Math.min(_minLength, valueLength);
         _maxLength = Math.max(_maxLength, valueLength);
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/BoundedColumnValuePartitionFunction.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/BoundedColumnValuePartitionFunction.java
index 86fc245275..a72beb34c8 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/BoundedColumnValuePartitionFunction.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/BoundedColumnValuePartitionFunction.java
@@ -62,9 +62,9 @@ public class BoundedColumnValuePartitionFunction implements PartitionFunction {
   }
 
   @Override
-  public int getPartition(Object value) {
+  public int getPartition(String value) {
     for (int i = 0; i < _numPartitions - 1; i++) {
-      if (_values[i].equalsIgnoreCase(value.toString())) {
+      if (_values[i].equalsIgnoreCase(value)) {
         return i + 1;
       }
     }
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/ByteArrayPartitionFunction.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/ByteArrayPartitionFunction.java
index aa970216a3..e8ff8edc24 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/ByteArrayPartitionFunction.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/ByteArrayPartitionFunction.java
@@ -42,8 +42,8 @@ public class ByteArrayPartitionFunction implements PartitionFunction {
   }
 
   @Override
-  public int getPartition(Object value) {
-    return abs(Arrays.hashCode(value.toString().getBytes(UTF_8))) % _numPartitions;
+  public int getPartition(String value) {
+    return abs(Arrays.hashCode(value.getBytes(UTF_8))) % _numPartitions;
   }
 
   @Override
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/HashCodePartitionFunction.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/HashCodePartitionFunction.java
index 11d34e3dc9..182760cf44 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/HashCodePartitionFunction.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/HashCodePartitionFunction.java
@@ -37,8 +37,8 @@ public class HashCodePartitionFunction implements PartitionFunction {
   }
 
   @Override
-  public int getPartition(Object value) {
-    return abs(value.toString().hashCode()) % _numPartitions;
+  public int getPartition(String value) {
+    return abs(value.hashCode()) % _numPartitions;
   }
 
   @Override
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/ModuloPartitionFunction.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/ModuloPartitionFunction.java
index 7e2ae7bf79..a4b8eb49ab 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/ModuloPartitionFunction.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/ModuloPartitionFunction.java
@@ -49,19 +49,8 @@ public class ModuloPartitionFunction implements PartitionFunction {
    * @return Partition id for the given value.
    */
   @Override
-  public int getPartition(Object value) {
-    if (value instanceof Integer) {
-      return toNonNegative((Integer) value % _numPartitions);
-    } else if (value instanceof Long) {
-      // Since _numPartitions is int, the modulo should also be int.
-      return toNonNegative((int) ((Long) value % _numPartitions));
-    } else if (value instanceof String) {
-      // Parse String as Long, to support both Integer and Long.
-      return toNonNegative((int) (Long.parseLong((String) value) % _numPartitions));
-    } else {
-      throw new IllegalArgumentException(
-          "Illegal argument for partitioning, expected Integer, got: " + value.getClass().getSimpleName());
-    }
+  public int getPartition(String value) {
+    return toNonNegative((int) (Long.parseLong(value) % _numPartitions));
   }
 
   @Override
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java
index d66eeca1ae..dad3caa6b2 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.hash.Hashing;
 import java.util.Map;
+import org.apache.commons.lang.StringUtils;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
@@ -33,10 +34,10 @@ public class Murmur3PartitionFunction implements PartitionFunction {
   public static final byte INVALID_CHAR = (byte) '?';
   private static final String NAME = "Murmur3";
   private static final String SEED_KEY = "seed";
-  private static final String MURMUR3_VARIANT = "variant";
+  private static final String VARIANT_KEY = "variant";
   private final int _numPartitions;
-  private final int _hashSeed;
-  private final String _variant;
+  private final int _seed;
+  private final boolean _useX64;
 
   /**
    * Constructor for the class.
@@ -45,29 +46,33 @@ public class Murmur3PartitionFunction implements PartitionFunction {
    */
   public Murmur3PartitionFunction(int numPartitions, Map<String, String> functionConfig) {
     Preconditions.checkArgument(numPartitions > 0, "Number of partitions must be > 0");
-    Preconditions.checkArgument(
-        functionConfig == null || functionConfig.get(MURMUR3_VARIANT) == null || functionConfig.get(MURMUR3_VARIANT)
-            .isEmpty() || functionConfig.get(MURMUR3_VARIANT).equals("x86_32") || functionConfig.get(MURMUR3_VARIANT)
-            .equals("x64_32"), "Murmur3 variant must be either x86_32 or x64_32");
     _numPartitions = numPartitions;
 
-    // default value of the hash seed is 0.
-    _hashSeed =
-        (functionConfig == null || functionConfig.get(SEED_KEY) == null || functionConfig.get(SEED_KEY).isEmpty()) ? 0
-            : Integer.parseInt(functionConfig.get(SEED_KEY));
-
-    // default value of the murmur3 variant is x86_32.
-    _variant =
-        (functionConfig == null || functionConfig.get(MURMUR3_VARIANT) == null || functionConfig.get(MURMUR3_VARIANT)
-            .isEmpty()) ? "x86_32" : functionConfig.get(MURMUR3_VARIANT);
+    int seed = 0;
+    boolean useX64 = false;
+    if (functionConfig != null) {
+      String seedString = functionConfig.get(SEED_KEY);
+      if (StringUtils.isNotEmpty(seedString)) {
+        seed = Integer.parseInt(seedString);
+      }
+      String variantString = functionConfig.get(VARIANT_KEY);
+      if (StringUtils.isNotEmpty(variantString)) {
+        if (variantString.equals("x64_32")) {
+          useX64 = true;
+        } else {
+          Preconditions.checkArgument(variantString.equals("x86_32"),
+              "Murmur3 variant must be either x86_32 or x64_32");
+        }
+      }
+    }
+    _seed = seed;
+    _useX64 = useX64;
   }
 
   @Override
-  public int getPartition(Object value) {
-    if (_variant.equals("x86_32")) {
-      return (murmur3Hash32BitsX86(value.toString().getBytes(UTF_8), _hashSeed) & Integer.MAX_VALUE) % _numPartitions;
-    }
-    return (murmur3Hash32BitsX64(value, _hashSeed) & Integer.MAX_VALUE) % _numPartitions;
+  public int getPartition(String value) {
+    int hash = _useX64 ? murmur3Hash32BitsX64(value, _seed) : murmur3Hash32BitsX86(value.getBytes(UTF_8), _seed);
+    return (hash & Integer.MAX_VALUE) % _numPartitions;
   }
 
   @Override
@@ -87,8 +92,8 @@ public class Murmur3PartitionFunction implements PartitionFunction {
   }
 
   @VisibleForTesting
-  int murmur3Hash32BitsX86(byte[] data, int hashSeed) {
-    return Hashing.murmur3_32_fixed(hashSeed).hashBytes(data).asInt();
+  static int murmur3Hash32BitsX86(byte[] data, int seed) {
+    return Hashing.murmur3_32_fixed(seed).hashBytes(data).asInt();
   }
 
   /**
@@ -110,14 +115,7 @@ public class Murmur3PartitionFunction implements PartitionFunction {
    * @see <a href="http://en.wikipedia.org/wiki/MurmurHash">MurmurHash entry on Wikipedia</a>
    */
 
-  private long getblock(byte[] key, int i) {
-    return ((key[i + 0] & 0x00000000000000FFL)) | ((key[i + 1] & 0x00000000000000FFL) << 8) | (
-        (key[i + 2] & 0x00000000000000FFL) << 16) | ((key[i + 3] & 0x00000000000000FFL) << 24) | (
-        (key[i + 4] & 0x00000000000000FFL) << 32) | ((key[i + 5] & 0x00000000000000FFL) << 40) | (
-        (key[i + 6] & 0x00000000000000FFL) << 48) | ((key[i + 7] & 0x00000000000000FFL) << 56);
-  }
-
-  private void bmix(State state) {
+  private static void bmix(State state) {
     state._k1 *= state._c1;
     state._k1 = (state._k1 << 23) | (state._k1 >>> 64 - 23);
     state._k1 *= state._c2;
@@ -139,7 +137,7 @@ public class Murmur3PartitionFunction implements PartitionFunction {
     state._c2 = state._c2 * 5 + 0x6bce6396;
   }
 
-  private long fmix(long k) {
+  private static long fmix(long k) {
     k ^= k >>> 33;
     k *= 0xff51afd7ed558ccdL;
     k ^= k >>> 33;
@@ -149,168 +147,8 @@ public class Murmur3PartitionFunction implements PartitionFunction {
     return k;
   }
 
-  /**
-   * Hash a value using the x64 64 bit variant of MurmurHash3
-   *
-   * @param key value to hash
-   * @param seed random value
-   * @return 64 bit hashed key
-   */
-  private long murmur3Hash64BitsX64(final byte[] key, final int seed) {
-    State state = new State();
-
-    state._h1 = 0x9368e53c2f6af274L ^ seed;
-    state._h2 = 0x586dcd208f7cd3fdL ^ seed;
-
-    state._c1 = 0x87c37b91114253d5L;
-    state._c2 = 0x4cf5ad432745937fL;
-
-    for (int i = 0; i < key.length / 16; i++) {
-      state._k1 = getblock(key, i * 2 * 8);
-      state._k2 = getblock(key, (i * 2 + 1) * 8);
-
-      bmix(state);
-    }
-
-    state._k1 = 0;
-    state._k2 = 0;
-
-    int tail = (key.length >>> 4) << 4;
-
-    // CHECKSTYLE:OFF
-    switch (key.length & 15) {
-      case 15:
-        state._k2 ^= (long) key[tail + 14] << 48;
-      case 14:
-        state._k2 ^= (long) key[tail + 13] << 40;
-      case 13:
-        state._k2 ^= (long) key[tail + 12] << 32;
-      case 12:
-        state._k2 ^= (long) key[tail + 11] << 24;
-      case 11:
-        state._k2 ^= (long) key[tail + 10] << 16;
-      case 10:
-        state._k2 ^= (long) key[tail + 9] << 8;
-      case 9:
-        state._k2 ^= key[tail + 8];
-      case 8:
-        state._k1 ^= (long) key[tail + 7] << 56;
-      case 7:
-        state._k1 ^= (long) key[tail + 6] << 48;
-      case 6:
-        state._k1 ^= (long) key[tail + 5] << 40;
-      case 5:
-        state._k1 ^= (long) key[tail + 4] << 32;
-      case 4:
-        state._k1 ^= (long) key[tail + 3] << 24;
-      case 3:
-        state._k1 ^= (long) key[tail + 2] << 16;
-      case 2:
-        state._k1 ^= (long) key[tail + 1] << 8;
-      case 1:
-        state._k1 ^= key[tail + 0];
-        bmix(state);
-    }
-
-    // CHECKSTYLE:ON
-    state._h2 ^= key.length;
-
-    state._h1 += state._h2;
-    state._h2 += state._h1;
-
-    state._h1 = fmix(state._h1);
-    state._h2 = fmix(state._h2);
-
-    state._h1 += state._h2;
-    state._h2 += state._h1;
-
-    return state._h1;
-  }
-
-  /**
-   * Hash a value using the x64 32 bit variant of MurmurHash3
-   *
-   * @param key value to hash
-   * @param seed random value
-   * @return 32 bit hashed key
-   */
-  private int murmur3Hash32BitsX64(final byte[] key, final int seed) {
-    return (int) (murmur3Hash64BitsX64(key, seed) >>> 32);
-  }
-
-  private long murmur3Hash64BitsX64(final long[] key, final int seed) {
-    // Exactly the same as MurmurHash3_x64_128, except it only returns state.h1
-    State state = new State();
-
-    state._h1 = 0x9368e53c2f6af274L ^ seed;
-    state._h2 = 0x586dcd208f7cd3fdL ^ seed;
-
-    state._c1 = 0x87c37b91114253d5L;
-    state._c2 = 0x4cf5ad432745937fL;
-
-    for (int i = 0; i < key.length / 2; i++) {
-      state._k1 = key[i * 2];
-      state._k2 = key[i * 2 + 1];
-
-      bmix(state);
-    }
-
-    long tail = key[key.length - 1];
-
-    if (key.length % 2 != 0) {
-      state._k1 ^= tail;
-      bmix(state);
-    }
-
-    state._h2 ^= key.length * 8;
-
-    state._h1 += state._h2;
-    state._h2 += state._h1;
-
-    state._h1 = fmix(state._h1);
-    state._h2 = fmix(state._h2);
-
-    state._h1 += state._h2;
-    state._h2 += state._h1;
-
-    return state._h1;
-  }
-
-  /**
-   * Hash a value using the x64 32 bit variant of MurmurHash3
-   *
-   * @param key value to hash
-   * @param seed random value
-   * @return 32 bit hashed key
-   */
-  private int murmur3Hash32BitsX64(final long[] key, final int seed) {
-    return (int) (murmur3Hash64BitsX64(key, seed) >>> 32);
-  }
-
   @VisibleForTesting
-  int murmur3Hash32BitsX64(Object o, int seed) {
-    if (o instanceof byte[]) {
-      return murmur3Hash32BitsX64((byte[]) o, seed);
-    } else if (o instanceof long[]) {
-      return murmur3Hash32BitsX64((long[]) o, seed);
-    } else if (o instanceof String) {
-      return murmur3Hash32BitsX64((String) o, seed);
-    } else {
-      // Differing from the source implementation here. The default case in the source implementation is to apply the
-      // hash on the hashcode of the object. The hashcode of an object is not guaranteed to be consistent across JVMs
-      // (except for String values), so we cannot guarantee the same value as the data source. Since we cannot
-      // guarantee similar values, we will instead apply the hash on the string representation of the object, which
-      // aligns with the rest of our code base.
-      return murmur3Hash32BitsX64(o.toString(), seed);
-    }
-  }
-
-  private int murmur3Hash32BitsX64(String s, int seed) {
-    return (int) (murmur3Hash32BitsX64String(s, seed) >> 32);
-  }
-
-  private long murmur3Hash32BitsX64String(String s, long seed) {
-    // Exactly the same as MurmurHash3_x64_64, except it works directly on a String's chars
+  static int murmur3Hash32BitsX64(String s, int seed) {
     State state = new State();
 
     state._h1 = 0x9368e53c2f6af274L ^ seed;
@@ -370,13 +208,10 @@ public class Murmur3PartitionFunction implements PartitionFunction {
       }
     }
 
-    // CHECKSTYLE:ON
     long savedK1 = state._k1;
     long savedK2 = state._k2;
     state._k1 = 0;
     state._k2 = 0;
-
-    // CHECKSTYLE:OFF
     switch (byteLen & 15) {
       case 15:
         state._k2 ^= (long) ((byte) (savedK2 >> 48)) << 48;
@@ -411,6 +246,7 @@ public class Murmur3PartitionFunction implements PartitionFunction {
         bmix(state);
     }
     // CHECKSTYLE:ON
+
     state._h2 ^= byteLen;
 
     state._h1 += state._h2;
@@ -422,10 +258,10 @@ public class Murmur3PartitionFunction implements PartitionFunction {
     state._h1 += state._h2;
     state._h2 += state._h1;
 
-    return state._h1;
+    return (int) (state._h1 >> 32);
   }
 
-  private void addByte(State state, byte b, int len) {
+  private static void addByte(State state, byte b, int len) {
     int shift = (len & 0x7) * 8;
     long bb = (b & 0xffL) << shift;
     if ((len & 0x8) == 0) {
@@ -440,7 +276,7 @@ public class Murmur3PartitionFunction implements PartitionFunction {
     }
   }
 
-  static class State {
+  private static class State {
     long _h1;
     long _h2;
 
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/MurmurPartitionFunction.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/MurmurPartitionFunction.java
index 7d2b9b8f31..a43a49f789 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/MurmurPartitionFunction.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/MurmurPartitionFunction.java
@@ -41,8 +41,8 @@ public class MurmurPartitionFunction implements PartitionFunction {
   }
 
   @Override
-  public int getPartition(Object value) {
-    return (murmur2(value.toString().getBytes(UTF_8)) & Integer.MAX_VALUE) % _numPartitions;
+  public int getPartition(String value) {
+    return (murmur2(value.getBytes(UTF_8)) & Integer.MAX_VALUE) % _numPartitions;
   }
 
   @Override
@@ -69,7 +69,7 @@ public class MurmurPartitionFunction implements PartitionFunction {
    * @return 32 bit hash of the given array
    */
   @VisibleForTesting
-  int murmur2(final byte[] data) {
+  static int murmur2(final byte[] data) {
     int length = data.length;
     int seed = 0x9747b28c;
     // 'm' and 'r' are mixing constants generated offline.
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunction.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunction.java
index e6404e48f2..3e6d26146d 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunction.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunction.java
@@ -34,11 +34,12 @@ public interface PartitionFunction extends Serializable {
 
   /**
    * Method to compute and return partition id for the given value.
+   * NOTE: The value is expected to be a string representation of the actual value.
    *
    * @param value Value for which to determine the partition id.
    * @return partition id for the value.
    */
-  int getPartition(Object value);
+  int getPartition(String value);
 
   /**
    * Returns the name of the partition function.
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java
index 13a60b424b..3f35c80cb4 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.spi.partition;
 
 import java.util.HashMap;
 import java.util.Map;
+import javax.annotation.Nullable;
 
 
 /**
@@ -68,7 +69,7 @@ public class PartitionFunctionFactory {
   // The PartitionFunctionFactory should be able to support these default implementations, as well as instantiate
   // based on config
   public static PartitionFunction getPartitionFunction(String functionName, int numPartitions,
-      Map<String, String> functionConfig) {
+      @Nullable Map<String, String> functionConfig) {
     PartitionFunctionType function = PartitionFunctionType.fromString(functionName);
     switch (function) {
       case Modulo:
diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java
index dec22a663d..a80f65d054 100644
--- a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java
+++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java
@@ -19,9 +19,7 @@
 package org.apache.pinot.segment.spi.partition;
 
 import com.fasterxml.jackson.databind.JsonNode;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -68,8 +66,6 @@ public class PartitionFunctionTest {
         if (expectedPartition < 0) {
           expectedPartition += numPartitions;
         }
-        assertEquals(partitionFunction.getPartition(value), expectedPartition);
-        assertEquals(partitionFunction.getPartition((long) value), expectedPartition);
         assertEquals(partitionFunction.getPartition(Integer.toString(value)), expectedPartition);
       }
 
@@ -80,7 +76,6 @@ public class PartitionFunctionTest {
         if (expectedPartition < 0) {
           expectedPartition += numPartitions;
         }
-        assertEquals(partitionFunction.getPartition(value), expectedPartition);
         assertEquals(partitionFunction.getPartition(Long.toString(value)), expectedPartition);
       }
     }
@@ -90,7 +85,6 @@ public class PartitionFunctionTest {
    * Unit test for {@link MurmurPartitionFunction}.
    * <ul>
    *   <li> Tests that partition values are in expected range. </li>
-   *   <li> Tests that toString returns expected string. </li>
    * </ul>
    */
   @Test
@@ -109,10 +103,7 @@ public class PartitionFunctionTest {
 
       for (int j = 0; j < NUM_ROUNDS; j++) {
         int value = j == 0 ? Integer.MIN_VALUE : random.nextInt();
-        int partition1 = partitionFunction.getPartition(value);
-        int partition2 = partitionFunction.getPartition(Integer.toString(value));
-        assertEquals(partition1, partition2);
-        assertTrue(partition1 >= 0 && partition1 < numPartitions);
+        testPartitionInExpectedRange(partitionFunction, value, numPartitions);
       }
     }
   }
@@ -220,26 +211,26 @@ public class PartitionFunctionTest {
         int value = j == 0 ? Integer.MIN_VALUE : random.nextInt();
 
         // check for the partition function with functionConfig as null.
-        testToStringAndPartitionNumber(partitionFunction1, value, numPartitions);
+        testPartitionInExpectedRange(partitionFunction1, value, numPartitions);
 
         // check for the partition function with non-null functionConfig but without seed value.
-        testToStringAndPartitionNumber(partitionFunction2, value, numPartitions);
+        testPartitionInExpectedRange(partitionFunction2, value, numPartitions);
 
         // check for the partition function with non-null functionConfig and with seed value.
-        testToStringAndPartitionNumber(partitionFunction3, value, numPartitions);
+        testPartitionInExpectedRange(partitionFunction3, value, numPartitions);
 
         // check for the partition function with non-null functionConfig and with seed value and variant.
-        testToStringAndPartitionNumber(partitionFunction4, value, numPartitions);
+        testPartitionInExpectedRange(partitionFunction4, value, numPartitions);
 
         // check for the partition function with non-null functionConfig and with explicitly provided default seed
         // value and variant.
-        testToStringAndPartitionNumber(partitionFunction5, value, numPartitions);
+        testPartitionInExpectedRange(partitionFunction5, value, numPartitions);
 
         // check for the partition function with non-null functionConfig and with empty seed value and default variant.
-        testToStringAndPartitionNumber(partitionFunction6, value, numPartitions);
+        testPartitionInExpectedRange(partitionFunction6, value, numPartitions);
 
         // check for the partition function with non-null functionConfig and with empty seed value and empty variant.
-        testToStringAndPartitionNumber(partitionFunction7, value, numPartitions);
+        testPartitionInExpectedRange(partitionFunction7, value, numPartitions);
       }
     }
   }
@@ -247,50 +238,18 @@ public class PartitionFunctionTest {
   @Test
   public void testMurmur3Equivalence() {
 
-    // 10 values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
-    // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 0 to those values and stored in
-    // expectedMurmurValuesFor32BitX64WithZeroSeedForByteArray.
-    int[] expectedMurmurValuesFor32BitX64WithZeroSeedForByteArray = new int[]{
-        -1569442405, -921191038, 16439113, -881572510, 2111401876, 655879980, 1409856380, -1348364123, -1770645361,
-        1277101955
-    };
-
-    // 10 values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
-    // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 9001 to those values and
-    // stored in expectedMurmurValuesFor32BitX64WithNonZeroSeedForByteArray.
-    int[] expectedMurmurValuesFor32BitX64WithNonZeroSeedForByteArray = new int[]{
-        698083240, 174075836, -938825597, 155806634, -831733828, 319389887, -939822329, -1785781936, -1796939240,
-        757512622
-    };
-
-    // 10 values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
-    // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 0 to those values and stored in
-    // expectedMurmurValuesFor32BitX64WithZeroSeedForLongArray.
-    int[] expectedMurmurValuesFor32BitX64WithZeroSeedForLongArray = new int[]{
-        -621156783, -1341356662, 1615513844, 1608247599, -1339558745, -1782606270, 1204009437, 8939246, -42073819,
-        1268621125
-    };
-
-    // 10 values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
-    // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 9001 to those values and
-    // stored in expectedMurmurValuesFor32BitX64WithNonZeroSeedForLongArray.
-    int[] expectedMurmurValuesFor32BitX64WithNonZeroSeedForLongArray = new int[]{
-        -159780599, 1266925141, -2039451704, 237323842, -1373894107, -1718192521, 314068498, 1377198162, 1239340429,
-        -1643307044
-    };
-
     // 10 String values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
     // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_64_String with seed = 0, applied right shift
-    // on 32 bits to those values and stored in expectedMurmurValuesFor32BitX64WithZeroSeedForString.
-    int[] expectedMurmurValuesFor32BitX64WithZeroSeedForString = new int[]{
+    // on 32 bits to those values and stored in expectedMurmurValuesFor32BitX64WithZeroSeed.
+    int[] expectedMurmurValuesFor32BitX64WithZeroSeed = new int[]{
         -930531654, 1010637996, -1251084035, -1551293561, 1591443335, 181872103, 1308755538, -432310401, -701537488,
         674867586
     };
 
     // 10 String values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
     // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_64_String with seed = 0, applied right shift
-    // on 32 bits those values and stored in expectedMurmurValuesFor32BitX64WithNonZeroSeedForString.
-    int[] expectedMurmurValuesFor32BitX64WithNonZeroSeedForString = new int[]{
+    // on 32 bits those values and stored in expectedMurmurValuesFor32BitX64WithNonZeroSeed.
+    int[] expectedMurmurValuesFor32BitX64WithNonZeroSeed = new int[]{
         1558697417, 933581816, 1071120824, 1964512897, 1629803052, 2037246152, -1867319466, -1003065762, -275998120,
         1386652858
     };
@@ -311,44 +270,23 @@ public class PartitionFunctionTest {
         -19253806
     };
 
-    // Test for 32 bit murmur3 hash with x64_64 variant and seed = 0 for byte array.
-    testMurmur3HashEquivalenceForDifferentDataTypes(0, expectedMurmurValuesFor32BitX64WithZeroSeedForByteArray,
-        "byteArray", "x64_32");
-
-    // Test for 32 bit murmur3 hash with x64_64 variant and seed = 9001 for byte array.
-    testMurmur3HashEquivalenceForDifferentDataTypes(9001, expectedMurmurValuesFor32BitX64WithNonZeroSeedForByteArray,
-        "byteArray", "x64_32");
-
-    // Test for 32 bit murmur3 hash with x64_64 variant and seed = 0 for long array.
-    testMurmur3HashEquivalenceForDifferentDataTypes(0, expectedMurmurValuesFor32BitX64WithZeroSeedForLongArray,
-        "longArray", "x64_32");
-
-    // Test for 32 bit murmur3 hash with x64_64 variant and seed = 9001 for long array.
-    testMurmur3HashEquivalenceForDifferentDataTypes(9001, expectedMurmurValuesFor32BitX64WithNonZeroSeedForLongArray,
-        "longArray", "x64_32");
-
     // Test for 64 bit murmur3 hash with x64_64 variant and seed = 0 for String.
-    testMurmur3HashEquivalenceForDifferentDataTypes(0, expectedMurmurValuesFor32BitX64WithZeroSeedForString, "String",
-        "x64_32");
+    testMurmur3Hash(0, expectedMurmurValuesFor32BitX64WithZeroSeed, true);
 
     // Test for 64 bit murmur3 hash with x64_64 variant and seed = 9001 for String.
-    testMurmur3HashEquivalenceForDifferentDataTypes(9001, expectedMurmurValuesFor32BitX64WithNonZeroSeedForString,
-        "String", "x64_32");
+    testMurmur3Hash(9001, expectedMurmurValuesFor32BitX64WithNonZeroSeed, true);
 
     // Test for 32 bit murmur3 hash with x86_32 variant and seed = 0 for byte array.
-    testMurmur3HashEquivalenceForDifferentDataTypes(0, expectedMurmurValuesFor32BitX86WithZeroSeed, "byteArray",
-        "x86_32");
+    testMurmur3Hash(0, expectedMurmurValuesFor32BitX86WithZeroSeed, false);
 
     // Test for 32 bit murmur3 hash with x86_32 variant and seed = 9001 for byte array.
-    testMurmur3HashEquivalenceForDifferentDataTypes(9001, expectedMurmurValuesFor32BitX86WithNonZeroSeed, "byteArray",
-        "x86_32");
+    testMurmur3Hash(9001, expectedMurmurValuesFor32BitX86WithNonZeroSeed, false);
   }
 
   /**
    * Unit test for {@link MurmurPartitionFunction}.
    * <ul>
    *   <li> Tests that partition values are in expected range. </li>
-   *   <li> Tests that toString returns expected string. </li>
    * </ul>
    */
   @Test
@@ -367,10 +305,7 @@ public class PartitionFunctionTest {
 
       for (int j = 0; j < NUM_ROUNDS; j++) {
         int value = j == 0 ? Integer.MIN_VALUE : random.nextInt();
-        int partition1 = partitionFunction.getPartition(value);
-        int partition2 = partitionFunction.getPartition(Integer.toString(value));
-        assertEquals(partition1, partition2);
-        assertTrue(partition1 >= 0 && partition1 < numPartitions);
+        testPartitionInExpectedRange(partitionFunction, value, numPartitions);
       }
     }
   }
@@ -394,7 +329,6 @@ public class PartitionFunctionTest {
         int value = j == 0 ? Integer.MIN_VALUE : random.nextInt();
         int hashCode = Integer.toString(value).hashCode();
         int expectedPartition = ((hashCode == Integer.MIN_VALUE) ? 0 : Math.abs(hashCode)) % numPartitions;
-        assertEquals(partitionFunction.getPartition(value), expectedPartition);
         assertEquals(partitionFunction.getPartition(Integer.toString(value)), expectedPartition);
       }
 
@@ -403,7 +337,6 @@ public class PartitionFunctionTest {
         double value = j == 0 ? Double.NEGATIVE_INFINITY : random.nextDouble();
         int hashCode = Double.toString(value).hashCode();
         int expectedPartition = ((hashCode == Integer.MIN_VALUE) ? 0 : Math.abs(hashCode)) % numPartitions;
-        assertEquals(partitionFunction.getPartition(value), expectedPartition);
         assertEquals(partitionFunction.getPartition(Double.toString(value)), expectedPartition);
       }
     }
@@ -467,17 +400,13 @@ public class PartitionFunctionTest {
     long seed = 100;
     Random random = new Random(seed);
 
-    int numPartitions = 5;
-    MurmurPartitionFunction murmurPartitionFunction = new MurmurPartitionFunction(numPartitions);
-
     // Generate the same values as above - 10 random values of size 7, using {@link Random::nextBytes} with seed 100
     // Apply {@link MurmurPartitionFunction::murmur2
     // compare with stored results
-    byte[] array = new byte[7];
+    byte[] bytes = new byte[7];
     for (int expectedMurmurValue : expectedMurmurValues) {
-      random.nextBytes(array);
-      int actualMurmurValue = murmurPartitionFunction.murmur2(array);
-      assertEquals(actualMurmurValue, expectedMurmurValue);
+      random.nextBytes(bytes);
+      assertEquals(MurmurPartitionFunction.murmur2(bytes), expectedMurmurValue);
     }
   }
 
@@ -501,7 +430,7 @@ public class PartitionFunctionTest {
 
     // generate the same 10 String values
     // Apply the partition function and compare with stored results
-    testPartitionFunctionEquivalence(murmurPartitionFunction, expectedPartitions);
+    testPartitionFunction(murmurPartitionFunction, expectedPartitions);
   }
 
   @Test
@@ -510,31 +439,17 @@ public class PartitionFunctionTest {
     // 10 String values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
     // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 0 to those values and stored in
     // expectedMurmurValuesFor32BitX64WithZeroSeed.
-    int[] expectedPartitions32BitsX64WithZeroSeedForByteArrayAndString = new int[]{
+    int[] expectedPartitions32BitsX64WithZeroSeed = new int[]{
         4, 1, 3, 2, 0, 3, 3, 2, 0, 1
     };
 
     // 10 String values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
     // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 9001 to those values and
     // stored in expectedMurmurValuesFor32BitX64WithZeroSeed.
-    int[] expectedPartitions32BitsX64WithNonZeroSeedForByteArrayAndString = new int[]{
+    int[] expectedPartitions32BitsX64WithNonZeroSeed = new int[]{
         2, 1, 4, 2, 2, 2, 2, 1, 3, 3
     };
 
-    // 10 long[] values of size 10, were randomly generated, using {@link Random::nextLong} with seed 100
-    // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 0 to those values and stored in
-    // expectedPartitions32BitsX64WithZeroSeedForLongArray.
-    int[] expectedPartitions32BitsX64WithZeroSeedForLongArray = new int[]{
-        0, 1, 4, 4, 3, 3, 2, 1, 4, 0
-    };
-
-    // 10 long[] values of size 10, were randomly generated, using {@link Random::nextLong} with seed 100
-    // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 0 to those values and stored in
-    // expectedPartitions32BitsX64WithNonZeroSeedForLongArray.
-    int[] expectedPartitions32BitsX64WithNonZeroSeedForLongArray = new int[]{
-        4, 1, 4, 2, 1, 2, 3, 2, 4, 4
-    };
-
     // 10 String values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
     // Applied com.google.common.hash.hashing::murmur3_32_fixed with seed = 0 to those values and stored in
     // expectedMurmurValuesFor32BitX64WithZeroSeed.
@@ -575,22 +490,13 @@ public class PartitionFunctionTest {
     // Generate the same 10 String values. Test if the calculated values are equal for both String and byte[] (they
     // should be equal when String is converted to byte[]) and if the values are equal to the expected values for the
     // x64_32 variant with seed = 0 and x64_32 variant with seed = 9001.
-    testPartitionFunctionEquivalenceWithStringAndByteArray(murmur3PartitionFunction1,
-        expectedPartitions32BitsX64WithZeroSeedForByteArrayAndString);
-    testPartitionFunctionEquivalenceWithStringAndByteArray(murmur3PartitionFunction2,
-        expectedPartitions32BitsX64WithNonZeroSeedForByteArrayAndString);
-
-    // Generate the same 10 long[] values. Test if the calculated values are equal to the expected values for the x64_32
-    // variant with seed = 0 and x64_32 variant with seed = 9001.
-    testPartitionFunctionEquivalenceWithLongArray(murmur3PartitionFunction1,
-        expectedPartitions32BitsX64WithZeroSeedForLongArray);
-    testPartitionFunctionEquivalenceWithLongArray(murmur3PartitionFunction2,
-        expectedPartitions32BitsX64WithNonZeroSeedForLongArray);
+    testPartitionFunction(murmur3PartitionFunction1, expectedPartitions32BitsX64WithZeroSeed);
+    testPartitionFunction(murmur3PartitionFunction2, expectedPartitions32BitsX64WithNonZeroSeed);
 
     // Generate the same 10 String values. Test if the calculated values are equal to the expected values for the x86_32
     // variant with seed = 0 and x86_32 variant with seed = 9001.
-    testPartitionFunctionEquivalence(murmur3PartitionFunction3, expectedPartitions32BitsX86WithZeroSeed);
-    testPartitionFunctionEquivalence(murmur3PartitionFunction4, expectedPartitions32BitsX86WithNonZeroSeed);
+    testPartitionFunction(murmur3PartitionFunction3, expectedPartitions32BitsX86WithZeroSeed);
+    testPartitionFunction(murmur3PartitionFunction4, expectedPartitions32BitsX86WithNonZeroSeed);
   }
 
   /**
@@ -609,132 +515,43 @@ public class PartitionFunctionTest {
 
     // generate the same 10 String values
     // Apply the partition function and compare with stored results
-    testPartitionFunctionEquivalence(byteArrayPartitionFunction, expectedPartitions);
+    testPartitionFunction(byteArrayPartitionFunction, expectedPartitions);
   }
 
-  private void testPartitionFunctionEquivalence(PartitionFunction partitionFunction, int[] expectedPartitions) {
+  private void testPartitionInExpectedRange(PartitionFunction partitionFunction, Object value, int numPartitions) {
+    int partition = partitionFunction.getPartition(value.toString());
+    assertTrue(partition >= 0 && partition < numPartitions);
+  }
+
+  private void testPartitionFunction(PartitionFunction partitionFunction, int[] expectedPartitions) {
     long seed = 100;
     Random random = new Random(seed);
 
     // Generate 10 random String values of size 7, using {@link Random::nextBytes} with seed 100
     // Apply given partition function
     // compare with expectedPartitions
-    byte[] array = new byte[7];
-    for (int expectedPartition : expectedPartitions) {
-      random.nextBytes(array);
-      String nextString = new String(array, UTF_8);
-      int actualPartition = partitionFunction.getPartition(nextString);
-      assertEquals(actualPartition, expectedPartition);
+    byte[] bytes = new byte[7];
+    for (int expectedPartitionNumber : expectedPartitions) {
+      random.nextBytes(bytes);
+      String nextString = new String(bytes, UTF_8);
+      assertEquals(partitionFunction.getPartition(nextString), expectedPartitionNumber);
     }
   }
 
-  private void testPartitionFunctionEquivalenceWithStringAndByteArray(PartitionFunction partitionFunction,
-      int[] expectedPartitions) {
+  private void testMurmur3Hash(int hashSeed, int[] expectedHashValues, boolean useX64) {
     long seed = 100;
     Random random = new Random(seed);
 
     // Generate 10 random String values of size 7, using {@link Random::nextBytes} with seed 100
     // Apply given partition function
     // compare with expectedPartitions
-    byte[] array = new byte[7];
-    for (int expectedPartitionNumber : expectedPartitions) {
-      random.nextBytes(array);
-      String nextString = new String(array, UTF_8);
-      int actualPartitionNumberFromString = partitionFunction.getPartition(nextString);
-      int actualPartitionNumberFromByteArray = partitionFunction.getPartition(nextString.getBytes(UTF_8));
-      assertEquals(actualPartitionNumberFromString, actualPartitionNumberFromByteArray);
-      assertEquals(actualPartitionNumberFromString, expectedPartitionNumber);
-    }
-  }
-
-  private void testPartitionFunctionEquivalenceWithLongArray(PartitionFunction partitionFunction,
-      int[] expectedPartitions) {
-    int seed = 100;
-    Random random = new Random(seed);
-
-    // Create a list of 10 long[] values using ArrayList, each of size 10.
-    List<long[]> longList = new ArrayList<>();
-
-    for (int i = 0; i < 10; i++) {
-      long[] longArray = new long[10];
-      for (int j = 0; j < 10; j++) {
-        longArray[j] = random.nextLong();
-      }
-      longList.add(longArray);
+    byte[] bytes = new byte[7];
+    for (int expectedHashValue : expectedHashValues) {
+      random.nextBytes(bytes);
+      String nextString = new String(bytes, UTF_8);
+      int actualHashValue = useX64 ? Murmur3PartitionFunction.murmur3Hash32BitsX64(nextString, hashSeed)
+          : Murmur3PartitionFunction.murmur3Hash32BitsX86(bytes, hashSeed);
+      assertEquals(actualHashValue, expectedHashValue);
     }
-
-    // Apply the partition function and compare with expected values.
-    for (int i = 0; i < 10; i++) {
-      int actualPartitionNumberFromLongArray = partitionFunction.getPartition(longList.get(i));
-      assertEquals(actualPartitionNumberFromLongArray, expectedPartitions[i]);
-    }
-  }
-
-  private void testMurmur3HashEquivalenceForDifferentDataTypes(int hashSeed, int[] expectedHashValues, String dataType,
-      String variant) {
-    long seed = 100;
-    Random random;
-    int numPartitions = 5;
-    Murmur3PartitionFunction murmur3PartitionFunction = new Murmur3PartitionFunction(numPartitions, null);
-
-    switch (dataType.toLowerCase()) {
-      case "string":
-        // Generate 10 random String values of size 7, using {@link Random::nextBytes} with seed 100
-        // Apply given partition function
-        // compare with expectedPartitions
-        random = new Random(seed);
-        byte[] array1 = new byte[7];
-        for (int expectedHashValue : expectedHashValues) {
-          random.nextBytes(array1);
-          String nextString = new String(array1, UTF_8);
-          int actualHashValueFromString = murmur3PartitionFunction.murmur3Hash32BitsX64(nextString, hashSeed);
-          assertEquals(actualHashValueFromString, expectedHashValue);
-        }
-        break;
-      case "bytearray":
-        // Generate 10 random String values of size 7, using {@link Random::nextBytes} with seed 100
-        // Apply given partition function
-        // compare with expectedPartitions
-        random = new Random(seed);
-        int actualHashValueFromByteArray;
-        byte[] array2 = new byte[7];
-        for (int expectedHashValue : expectedHashValues) {
-          random.nextBytes(array2);
-          if (variant.equals("x64_32")) {
-            actualHashValueFromByteArray = murmur3PartitionFunction.murmur3Hash32BitsX64(array2, hashSeed);
-          } else {
-            actualHashValueFromByteArray = murmur3PartitionFunction.murmur3Hash32BitsX86(array2, hashSeed);
-          }
-          assertEquals(actualHashValueFromByteArray, expectedHashValue);
-        }
-        break;
-      case "longarray":
-        random = new Random(seed);
-        // Create a list of 10 long[] values using ArrayList, each of size 10.
-        List<long[]> longList = new ArrayList<>();
-
-        for (int i = 0; i < 10; i++) {
-          long[] longArray = new long[10];
-          for (int j = 0; j < 10; j++) {
-            longArray[j] = random.nextLong();
-          }
-          longList.add(longArray);
-        }
-
-        // Apply the partition function and compare with expected values.
-        for (int i = 0; i < 10; i++) {
-          int actualHashValueFromLongArray = murmur3PartitionFunction.murmur3Hash32BitsX64(longList.get(i), hashSeed);
-          assertEquals(actualHashValueFromLongArray, expectedHashValues[i]);
-        }
-        break;
-        default:
-    }
-  }
-  private void testToStringAndPartitionNumber(PartitionFunction partitionFunction, int testValueForGetPartition,
-      int numPartitions) {
-    int partition1 = partitionFunction.getPartition(testValueForGetPartition);
-    int partition2 = partitionFunction.getPartition(Integer.toString(testValueForGetPartition));
-    assertEquals(partition1, partition2);
-    assertTrue(partition1 >= 0 && partition1 < numPartitions);
   }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
index a3dc1bbef5..e1218452d3 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
@@ -207,12 +207,14 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable {
    * @param value Value for which String value needs to be returned
    * @return String value for the object.
    */
-  protected static String getStringValue(Object value) {
+  public static String getStringValue(Object value) {
+    if (value instanceof BigDecimal) {
+      return ((BigDecimal) value).toPlainString();
+    }
     if (value instanceof byte[]) {
       return BytesUtils.toHexString((byte[]) value);
-    } else {
-      return value.toString();
     }
+    return value.toString();
   }
 
   // Required by JSON de-serializer. DO NOT REMOVE.
@@ -562,6 +564,19 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable {
       }
     }
 
+    /**
+     * Converts the given value of the data type to string.The input value for BYTES type should be byte[].
+     */
+    public String toString(Object value) {
+      if (this == BIG_DECIMAL) {
+        return ((BigDecimal) value).toPlainString();
+      }
+      if (this == BYTES) {
+        return BytesUtils.toHexString((byte[]) value);
+      }
+      return value.toString();
+    }
+
     /**
      * Converts the given string value to the data type. Returns ByteArray for BYTES.
      */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org