You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/12/20 00:41:18 UTC
(pinot) branch master updated: Fix partition handling by always using string values (#12115)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c706fd04e5 Fix partition handling by always using string values (#12115)
c706fd04e5 is described below
commit c706fd04e57c8617e168619e3620da74efff7ea1
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Dec 19 16:41:13 2023 -0800
Fix partition handling by always using string values (#12115)
---
.../MultiPartitionColumnsSegmentPruner.java | 5 +-
.../SinglePartitionColumnSegmentPruner.java | 5 +-
.../request/context/RequestContextUtils.java | 35 ++-
.../query/pruner/ColumnValueSegmentPruner.java | 5 +-
.../core/query/pruner/ValueBasedSegmentPruner.java | 11 +-
.../partitioner/TableConfigPartitioner.java | 3 +-
.../indexsegment/mutable/MutableSegmentImpl.java | 6 +-
.../stats/AbstractColumnStatisticsCollector.java | 13 +-
.../BigDecimalColumnPreIndexStatsCollector.java | 4 +-
.../stats/BytesColumnPredIndexStatsCollector.java | 4 +-
.../stats/DoubleColumnPreIndexStatsCollector.java | 4 +-
.../stats/FloatColumnPreIndexStatsCollector.java | 4 +-
.../stats/IntColumnPreIndexStatsCollector.java | 4 +-
.../stats/LongColumnPreIndexStatsCollector.java | 4 +-
.../stats/StringColumnPreIndexStatsCollector.java | 4 +-
.../BoundedColumnValuePartitionFunction.java | 4 +-
.../spi/partition/ByteArrayPartitionFunction.java | 4 +-
.../spi/partition/HashCodePartitionFunction.java | 4 +-
.../spi/partition/ModuloPartitionFunction.java | 15 +-
.../spi/partition/Murmur3PartitionFunction.java | 234 +++--------------
.../spi/partition/MurmurPartitionFunction.java | 6 +-
.../segment/spi/partition/PartitionFunction.java | 3 +-
.../spi/partition/PartitionFunctionFactory.java | 3 +-
.../spi/partition/PartitionFunctionTest.java | 277 ++++-----------------
.../java/org/apache/pinot/spi/data/FieldSpec.java | 21 +-
25 files changed, 194 insertions(+), 488 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/MultiPartitionColumnsSegmentPruner.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/MultiPartitionColumnsSegmentPruner.java
index b05ab4e55b..6970d3f54e 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/MultiPartitionColumnsSegmentPruner.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/MultiPartitionColumnsSegmentPruner.java
@@ -34,6 +34,7 @@ import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.Expression;
import org.apache.pinot.common.request.Function;
import org.apache.pinot.common.request.Identifier;
+import org.apache.pinot.common.request.context.RequestContextUtils;
import org.apache.pinot.sql.FilterKind;
@@ -140,7 +141,7 @@ public class MultiPartitionColumnsSegmentPruner implements SegmentPruner {
if (identifier != null) {
SegmentPartitionInfo partitionInfo = columnPartitionInfoMap.get(identifier.getName());
return partitionInfo == null || partitionInfo.getPartitions().contains(
- partitionInfo.getPartitionFunction().getPartition(operands.get(1).getLiteral().getFieldValue()));
+ partitionInfo.getPartitionFunction().getPartition(RequestContextUtils.getStringValue(operands.get(1))));
} else {
return true;
}
@@ -155,7 +156,7 @@ public class MultiPartitionColumnsSegmentPruner implements SegmentPruner {
int numOperands = operands.size();
for (int i = 1; i < numOperands; i++) {
if (partitionInfo.getPartitions().contains(partitionInfo.getPartitionFunction()
- .getPartition(operands.get(i).getLiteral().getFieldValue().toString()))) {
+ .getPartition(RequestContextUtils.getStringValue(operands.get(i))))) {
return true;
}
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SinglePartitionColumnSegmentPruner.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SinglePartitionColumnSegmentPruner.java
index 80ad4b7e3e..7016484e48 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SinglePartitionColumnSegmentPruner.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SinglePartitionColumnSegmentPruner.java
@@ -33,6 +33,7 @@ import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.Expression;
import org.apache.pinot.common.request.Function;
import org.apache.pinot.common.request.Identifier;
+import org.apache.pinot.common.request.context.RequestContextUtils;
import org.apache.pinot.sql.FilterKind;
@@ -129,7 +130,7 @@ public class SinglePartitionColumnSegmentPruner implements SegmentPruner {
Identifier identifier = operands.get(0).getIdentifier();
if (identifier != null && identifier.getName().equals(_partitionColumn)) {
return partitionInfo.getPartitions().contains(partitionInfo.getPartitionFunction()
- .getPartition(operands.get(1).getLiteral().getFieldValue().toString()));
+ .getPartition(RequestContextUtils.getStringValue(operands.get(1))));
} else {
return true;
}
@@ -140,7 +141,7 @@ public class SinglePartitionColumnSegmentPruner implements SegmentPruner {
int numOperands = operands.size();
for (int i = 1; i < numOperands; i++) {
if (partitionInfo.getPartitions().contains(partitionInfo.getPartitionFunction()
- .getPartition(operands.get(i).getLiteral().getFieldValue().toString()))) {
+ .getPartition(RequestContextUtils.getStringValue(operands.get(i))))) {
return true;
}
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/request/context/RequestContextUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/request/context/RequestContextUtils.java
index 0369b29f59..28f3037b25 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/request/context/RequestContextUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/request/context/RequestContextUtils.java
@@ -42,6 +42,8 @@ import org.apache.pinot.common.utils.RegexpPatternConverterUtils;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.apache.pinot.spi.utils.BigDecimalUtils;
+import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.sql.FilterKind;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
@@ -259,15 +261,38 @@ public class RequestContextUtils {
}
}
- private static String getStringValue(Expression thriftExpression) {
- if (thriftExpression.getType() != ExpressionType.LITERAL) {
+ public static String getStringValue(Expression thriftExpression) {
+ Literal literal = thriftExpression.getLiteral();
+ if (literal == null) {
throw new BadQueryRequestException(
"Pinot does not support column or function on the right-hand side of the predicate");
}
- if (thriftExpression.getLiteral().getSetField() == Literal._Fields.NULL_VALUE) {
- return "null";
+ switch (literal.getSetField()) {
+ case BOOL_VALUE:
+ return Boolean.toString(literal.getBoolValue());
+ case BYTE_VALUE:
+ return Byte.toString(literal.getByteValue());
+ case SHORT_VALUE:
+ return Short.toString(literal.getShortValue());
+ case INT_VALUE:
+ return Integer.toString(literal.getIntValue());
+ case LONG_VALUE:
+ return Long.toString(literal.getLongValue());
+ case FLOAT_VALUE:
+ return Float.toString(literal.getFloatValue());
+ case DOUBLE_VALUE:
+ return Double.toString(literal.getDoubleValue());
+ case BIG_DECIMAL_VALUE:
+ return BigDecimalUtils.deserialize(literal.getBigDecimalValue()).toPlainString();
+ case STRING_VALUE:
+ return literal.getStringValue();
+ case BINARY_VALUE:
+ return BytesUtils.toHexString(literal.getBinaryValue());
+ case NULL_VALUE:
+ return "null";
+ default:
+ throw new IllegalStateException("Unsupported literal type: " + literal.getSetField());
}
- return thriftExpression.getLiteral().getFieldValue().toString();
}
/**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
index a2b949da9e..8eead9f552 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
@@ -107,9 +107,8 @@ public class ColumnValueSegmentPruner extends ValueBasedSegmentPruner {
assert dataSource != null;
DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata();
ValueCache.CachedValue cachedValue = valueCache.get(eqPredicate, dataSourceMetadata.getDataType());
- Comparable value = cachedValue.getComparableValue();
// Check min/max value
- if (!checkMinMaxRange(dataSourceMetadata, value)) {
+ if (!checkMinMaxRange(dataSourceMetadata, cachedValue.getComparableValue())) {
return true;
}
// Check column partition
@@ -117,7 +116,7 @@ public class ColumnValueSegmentPruner extends ValueBasedSegmentPruner {
if (partitionFunction != null) {
Set<Integer> partitions = dataSourceMetadata.getPartitions();
assert partitions != null;
- if (!partitions.contains(partitionFunction.getPartition(value))) {
+ if (!partitions.contains(partitionFunction.getPartition(cachedValue.getValue()))) {
return true;
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ValueBasedSegmentPruner.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ValueBasedSegmentPruner.java
index 76f1b2737b..2e3214c8c2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ValueBasedSegmentPruner.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ValueBasedSegmentPruner.java
@@ -200,17 +200,21 @@ abstract public class ValueBasedSegmentPruner implements SegmentPruner {
}
public static class CachedValue {
- private final Object _value;
+ private final String _value;
private boolean _hashed = false;
private long _hash1;
private long _hash2;
private DataType _dt;
private Comparable _comparableValue;
- private CachedValue(Object value) {
+ private CachedValue(String value) {
_value = value;
}
+ public String getValue() {
+ return _value;
+ }
+
public Comparable getComparableValue() {
assert _dt != null;
return _comparableValue;
@@ -218,9 +222,8 @@ abstract public class ValueBasedSegmentPruner implements SegmentPruner {
public void ensureDataType(DataType dt) {
if (dt != _dt) {
- String strValue = _value.toString();
_dt = dt;
- _comparableValue = convertValue(strValue, dt);
+ _comparableValue = convertValue(_value, dt);
_hashed = false;
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/partitioner/TableConfigPartitioner.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/partitioner/TableConfigPartitioner.java
index 4a088f129d..cff7f3b745 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/partitioner/TableConfigPartitioner.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/partitioner/TableConfigPartitioner.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.segment.processing.partitioner;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.readers.GenericRow;
@@ -41,6 +42,6 @@ public class TableConfigPartitioner implements Partitioner {
@Override
public String getPartition(GenericRow genericRow) {
- return String.valueOf(_partitionFunction.getPartition(genericRow.getValue(_column)));
+ return String.valueOf(_partitionFunction.getPartition(FieldSpec.getStringValue(genericRow.getValue(_column))));
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 36f5d6e81c..55e0aec072 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -680,13 +680,13 @@ public class MutableSegmentImpl implements MutableSegment {
if (fieldSpec.isSingleValueField()) {
// Check partitions
if (column.equals(_partitionColumn)) {
- Object valueToPartition = (dataType == BYTES) ? new ByteArray((byte[]) value) : value;
- int partition = _partitionFunction.getPartition(valueToPartition);
+ String stringValue = dataType.toString(value);
+ int partition = _partitionFunction.getPartition(stringValue);
if (partition != _mainPartitionId) {
if (indexContainer._partitions.add(partition)) {
// for every partition other than mainPartitionId, log a warning once
_logger.warn("Found new partition: {} from partition column: {}, value: {}", partition, column,
- valueToPartition);
+ stringValue);
}
// always emit a metric when a partition other than mainPartitionId is detected
if (_serverMetrics != null) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
index 72f8eabb74..5af1e14999 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java
@@ -110,6 +110,7 @@ public abstract class AbstractColumnStatisticsCollector implements ColumnStatist
* Returns the {@link PartitionFunction} for the column.
* @return Partition function for the column.
*/
+ @Nullable
public PartitionFunction getPartitionFunction() {
return _partitionFunction;
}
@@ -143,18 +144,20 @@ public abstract class AbstractColumnStatisticsCollector implements ColumnStatist
return _partitions;
}
+ protected boolean isPartitionEnabled() {
+ return _partitionFunction != null;
+ }
+
/**
* Updates the partition range based on the partition of the given value.
*
* @param value Column value.
*/
- protected void updatePartition(Object value) {
- if (_partitionFunction != null) {
- _partitions.add(_partitionFunction.getPartition(value));
- }
+ protected void updatePartition(String value) {
+ _partitions.add(_partitionFunction.getPartition(value));
}
- void updateTotalNumberOfEntries(Object[] entries) {
+ protected void updateTotalNumberOfEntries(Object[] entries) {
_totalNumberOfEntries += entries.length;
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BigDecimalColumnPreIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BigDecimalColumnPreIndexStatsCollector.java
index 1c4b7913ed..cb1fb992b3 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BigDecimalColumnPreIndexStatsCollector.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BigDecimalColumnPreIndexStatsCollector.java
@@ -51,7 +51,9 @@ public class BigDecimalColumnPreIndexStatsCollector extends AbstractColumnStatis
int length = BigDecimalUtils.byteSize(value);
addressSorted(value);
if (_values.add(value)) {
- updatePartition(value);
+ if (isPartitionEnabled()) {
+ updatePartition(value.toPlainString());
+ }
_minLength = Math.min(_minLength, length);
_maxLength = Math.max(_maxLength, length);
_maxRowLength = _maxLength;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java
index 90732d1acc..34cb0ecc1e 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/BytesColumnPredIndexStatsCollector.java
@@ -62,7 +62,9 @@ public class BytesColumnPredIndexStatsCollector extends AbstractColumnStatistics
ByteArray value = new ByteArray((byte[]) entry);
addressSorted(value);
if (_values.add(value)) {
- updatePartition(value);
+ if (isPartitionEnabled()) {
+ updatePartition(value.toString());
+ }
int length = value.length();
_minLength = Math.min(_minLength, length);
_maxLength = Math.max(_maxLength, length);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/DoubleColumnPreIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/DoubleColumnPreIndexStatsCollector.java
index 2a80f353b8..cd8c9cf4b0 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/DoubleColumnPreIndexStatsCollector.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/DoubleColumnPreIndexStatsCollector.java
@@ -51,7 +51,9 @@ public class DoubleColumnPreIndexStatsCollector extends AbstractColumnStatistics
double value = (double) entry;
addressSorted(value);
if (_values.add(value)) {
- updatePartition(value);
+ if (isPartitionEnabled()) {
+ updatePartition(Double.toString(value));
+ }
}
_totalNumberOfEntries++;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/FloatColumnPreIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/FloatColumnPreIndexStatsCollector.java
index 09051c6517..fa117aa65e 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/FloatColumnPreIndexStatsCollector.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/FloatColumnPreIndexStatsCollector.java
@@ -51,7 +51,9 @@ public class FloatColumnPreIndexStatsCollector extends AbstractColumnStatisticsC
float value = (float) entry;
addressSorted(value);
if (_values.add(value)) {
- updatePartition(value);
+ if (isPartitionEnabled()) {
+ updatePartition(Float.toString(value));
+ }
}
_totalNumberOfEntries++;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/IntColumnPreIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/IntColumnPreIndexStatsCollector.java
index d65429e2d7..3675cdd9e4 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/IntColumnPreIndexStatsCollector.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/IntColumnPreIndexStatsCollector.java
@@ -51,7 +51,9 @@ public class IntColumnPreIndexStatsCollector extends AbstractColumnStatisticsCol
int value = (int) entry;
addressSorted(value);
if (_values.add(value)) {
- updatePartition(value);
+ if (isPartitionEnabled()) {
+ updatePartition(Integer.toString(value));
+ }
}
_totalNumberOfEntries++;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/LongColumnPreIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/LongColumnPreIndexStatsCollector.java
index 39cb4fa54b..5951444b87 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/LongColumnPreIndexStatsCollector.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/LongColumnPreIndexStatsCollector.java
@@ -51,7 +51,9 @@ public class LongColumnPreIndexStatsCollector extends AbstractColumnStatisticsCo
long value = (long) entry;
addressSorted(value);
if (_values.add(value)) {
- updatePartition(value);
+ if (isPartitionEnabled()) {
+ updatePartition(Long.toString(value));
+ }
}
_totalNumberOfEntries++;
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
index 980fcd5ba5..b896ac30ff 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/StringColumnPreIndexStatsCollector.java
@@ -62,7 +62,9 @@ public class StringColumnPreIndexStatsCollector extends AbstractColumnStatistics
String value = (String) entry;
addressSorted(value);
if (_values.add(value)) {
- updatePartition(value);
+ if (isPartitionEnabled()) {
+ updatePartition(value);
+ }
int valueLength = value.getBytes(UTF_8).length;
_minLength = Math.min(_minLength, valueLength);
_maxLength = Math.max(_maxLength, valueLength);
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/BoundedColumnValuePartitionFunction.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/BoundedColumnValuePartitionFunction.java
index 86fc245275..a72beb34c8 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/BoundedColumnValuePartitionFunction.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/BoundedColumnValuePartitionFunction.java
@@ -62,9 +62,9 @@ public class BoundedColumnValuePartitionFunction implements PartitionFunction {
}
@Override
- public int getPartition(Object value) {
+ public int getPartition(String value) {
for (int i = 0; i < _numPartitions - 1; i++) {
- if (_values[i].equalsIgnoreCase(value.toString())) {
+ if (_values[i].equalsIgnoreCase(value)) {
return i + 1;
}
}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/ByteArrayPartitionFunction.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/ByteArrayPartitionFunction.java
index aa970216a3..e8ff8edc24 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/ByteArrayPartitionFunction.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/ByteArrayPartitionFunction.java
@@ -42,8 +42,8 @@ public class ByteArrayPartitionFunction implements PartitionFunction {
}
@Override
- public int getPartition(Object value) {
- return abs(Arrays.hashCode(value.toString().getBytes(UTF_8))) % _numPartitions;
+ public int getPartition(String value) {
+ return abs(Arrays.hashCode(value.getBytes(UTF_8))) % _numPartitions;
}
@Override
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/HashCodePartitionFunction.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/HashCodePartitionFunction.java
index 11d34e3dc9..182760cf44 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/HashCodePartitionFunction.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/HashCodePartitionFunction.java
@@ -37,8 +37,8 @@ public class HashCodePartitionFunction implements PartitionFunction {
}
@Override
- public int getPartition(Object value) {
- return abs(value.toString().hashCode()) % _numPartitions;
+ public int getPartition(String value) {
+ return abs(value.hashCode()) % _numPartitions;
}
@Override
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/ModuloPartitionFunction.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/ModuloPartitionFunction.java
index 7e2ae7bf79..a4b8eb49ab 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/ModuloPartitionFunction.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/ModuloPartitionFunction.java
@@ -49,19 +49,8 @@ public class ModuloPartitionFunction implements PartitionFunction {
* @return Partition id for the given value.
*/
@Override
- public int getPartition(Object value) {
- if (value instanceof Integer) {
- return toNonNegative((Integer) value % _numPartitions);
- } else if (value instanceof Long) {
- // Since _numPartitions is int, the modulo should also be int.
- return toNonNegative((int) ((Long) value % _numPartitions));
- } else if (value instanceof String) {
- // Parse String as Long, to support both Integer and Long.
- return toNonNegative((int) (Long.parseLong((String) value) % _numPartitions));
- } else {
- throw new IllegalArgumentException(
- "Illegal argument for partitioning, expected Integer, got: " + value.getClass().getSimpleName());
- }
+ public int getPartition(String value) {
+ return toNonNegative((int) (Long.parseLong(value) % _numPartitions));
}
@Override
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java
index d66eeca1ae..dad3caa6b2 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.hash.Hashing;
import java.util.Map;
+import org.apache.commons.lang.StringUtils;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -33,10 +34,10 @@ public class Murmur3PartitionFunction implements PartitionFunction {
public static final byte INVALID_CHAR = (byte) '?';
private static final String NAME = "Murmur3";
private static final String SEED_KEY = "seed";
- private static final String MURMUR3_VARIANT = "variant";
+ private static final String VARIANT_KEY = "variant";
private final int _numPartitions;
- private final int _hashSeed;
- private final String _variant;
+ private final int _seed;
+ private final boolean _useX64;
/**
* Constructor for the class.
@@ -45,29 +46,33 @@ public class Murmur3PartitionFunction implements PartitionFunction {
*/
public Murmur3PartitionFunction(int numPartitions, Map<String, String> functionConfig) {
Preconditions.checkArgument(numPartitions > 0, "Number of partitions must be > 0");
- Preconditions.checkArgument(
- functionConfig == null || functionConfig.get(MURMUR3_VARIANT) == null || functionConfig.get(MURMUR3_VARIANT)
- .isEmpty() || functionConfig.get(MURMUR3_VARIANT).equals("x86_32") || functionConfig.get(MURMUR3_VARIANT)
- .equals("x64_32"), "Murmur3 variant must be either x86_32 or x64_32");
_numPartitions = numPartitions;
- // default value of the hash seed is 0.
- _hashSeed =
- (functionConfig == null || functionConfig.get(SEED_KEY) == null || functionConfig.get(SEED_KEY).isEmpty()) ? 0
- : Integer.parseInt(functionConfig.get(SEED_KEY));
-
- // default value of the murmur3 variant is x86_32.
- _variant =
- (functionConfig == null || functionConfig.get(MURMUR3_VARIANT) == null || functionConfig.get(MURMUR3_VARIANT)
- .isEmpty()) ? "x86_32" : functionConfig.get(MURMUR3_VARIANT);
+ int seed = 0;
+ boolean useX64 = false;
+ if (functionConfig != null) {
+ String seedString = functionConfig.get(SEED_KEY);
+ if (StringUtils.isNotEmpty(seedString)) {
+ seed = Integer.parseInt(seedString);
+ }
+ String variantString = functionConfig.get(VARIANT_KEY);
+ if (StringUtils.isNotEmpty(variantString)) {
+ if (variantString.equals("x64_32")) {
+ useX64 = true;
+ } else {
+ Preconditions.checkArgument(variantString.equals("x86_32"),
+ "Murmur3 variant must be either x86_32 or x64_32");
+ }
+ }
+ }
+ _seed = seed;
+ _useX64 = useX64;
}
@Override
- public int getPartition(Object value) {
- if (_variant.equals("x86_32")) {
- return (murmur3Hash32BitsX86(value.toString().getBytes(UTF_8), _hashSeed) & Integer.MAX_VALUE) % _numPartitions;
- }
- return (murmur3Hash32BitsX64(value, _hashSeed) & Integer.MAX_VALUE) % _numPartitions;
+ public int getPartition(String value) {
+ int hash = _useX64 ? murmur3Hash32BitsX64(value, _seed) : murmur3Hash32BitsX86(value.getBytes(UTF_8), _seed);
+ return (hash & Integer.MAX_VALUE) % _numPartitions;
}
@Override
@@ -87,8 +92,8 @@ public class Murmur3PartitionFunction implements PartitionFunction {
}
@VisibleForTesting
- int murmur3Hash32BitsX86(byte[] data, int hashSeed) {
- return Hashing.murmur3_32_fixed(hashSeed).hashBytes(data).asInt();
+ static int murmur3Hash32BitsX86(byte[] data, int seed) {
+ return Hashing.murmur3_32_fixed(seed).hashBytes(data).asInt();
}
/**
@@ -110,14 +115,7 @@ public class Murmur3PartitionFunction implements PartitionFunction {
* @see <a href="http://en.wikipedia.org/wiki/MurmurHash">MurmurHash entry on Wikipedia</a>
*/
- private long getblock(byte[] key, int i) {
- return ((key[i + 0] & 0x00000000000000FFL)) | ((key[i + 1] & 0x00000000000000FFL) << 8) | (
- (key[i + 2] & 0x00000000000000FFL) << 16) | ((key[i + 3] & 0x00000000000000FFL) << 24) | (
- (key[i + 4] & 0x00000000000000FFL) << 32) | ((key[i + 5] & 0x00000000000000FFL) << 40) | (
- (key[i + 6] & 0x00000000000000FFL) << 48) | ((key[i + 7] & 0x00000000000000FFL) << 56);
- }
-
- private void bmix(State state) {
+ private static void bmix(State state) {
state._k1 *= state._c1;
state._k1 = (state._k1 << 23) | (state._k1 >>> 64 - 23);
state._k1 *= state._c2;
@@ -139,7 +137,7 @@ public class Murmur3PartitionFunction implements PartitionFunction {
state._c2 = state._c2 * 5 + 0x6bce6396;
}
- private long fmix(long k) {
+ private static long fmix(long k) {
k ^= k >>> 33;
k *= 0xff51afd7ed558ccdL;
k ^= k >>> 33;
@@ -149,168 +147,8 @@ public class Murmur3PartitionFunction implements PartitionFunction {
return k;
}
- /**
- * Hash a value using the x64 64 bit variant of MurmurHash3
- *
- * @param key value to hash
- * @param seed random value
- * @return 64 bit hashed key
- */
- private long murmur3Hash64BitsX64(final byte[] key, final int seed) {
- State state = new State();
-
- state._h1 = 0x9368e53c2f6af274L ^ seed;
- state._h2 = 0x586dcd208f7cd3fdL ^ seed;
-
- state._c1 = 0x87c37b91114253d5L;
- state._c2 = 0x4cf5ad432745937fL;
-
- for (int i = 0; i < key.length / 16; i++) {
- state._k1 = getblock(key, i * 2 * 8);
- state._k2 = getblock(key, (i * 2 + 1) * 8);
-
- bmix(state);
- }
-
- state._k1 = 0;
- state._k2 = 0;
-
- int tail = (key.length >>> 4) << 4;
-
- // CHECKSTYLE:OFF
- switch (key.length & 15) {
- case 15:
- state._k2 ^= (long) key[tail + 14] << 48;
- case 14:
- state._k2 ^= (long) key[tail + 13] << 40;
- case 13:
- state._k2 ^= (long) key[tail + 12] << 32;
- case 12:
- state._k2 ^= (long) key[tail + 11] << 24;
- case 11:
- state._k2 ^= (long) key[tail + 10] << 16;
- case 10:
- state._k2 ^= (long) key[tail + 9] << 8;
- case 9:
- state._k2 ^= key[tail + 8];
- case 8:
- state._k1 ^= (long) key[tail + 7] << 56;
- case 7:
- state._k1 ^= (long) key[tail + 6] << 48;
- case 6:
- state._k1 ^= (long) key[tail + 5] << 40;
- case 5:
- state._k1 ^= (long) key[tail + 4] << 32;
- case 4:
- state._k1 ^= (long) key[tail + 3] << 24;
- case 3:
- state._k1 ^= (long) key[tail + 2] << 16;
- case 2:
- state._k1 ^= (long) key[tail + 1] << 8;
- case 1:
- state._k1 ^= key[tail + 0];
- bmix(state);
- }
-
- // CHECKSTYLE:ON
- state._h2 ^= key.length;
-
- state._h1 += state._h2;
- state._h2 += state._h1;
-
- state._h1 = fmix(state._h1);
- state._h2 = fmix(state._h2);
-
- state._h1 += state._h2;
- state._h2 += state._h1;
-
- return state._h1;
- }
-
- /**
- * Hash a value using the x64 32 bit variant of MurmurHash3
- *
- * @param key value to hash
- * @param seed random value
- * @return 32 bit hashed key
- */
- private int murmur3Hash32BitsX64(final byte[] key, final int seed) {
- return (int) (murmur3Hash64BitsX64(key, seed) >>> 32);
- }
-
- private long murmur3Hash64BitsX64(final long[] key, final int seed) {
- // Exactly the same as MurmurHash3_x64_128, except it only returns state.h1
- State state = new State();
-
- state._h1 = 0x9368e53c2f6af274L ^ seed;
- state._h2 = 0x586dcd208f7cd3fdL ^ seed;
-
- state._c1 = 0x87c37b91114253d5L;
- state._c2 = 0x4cf5ad432745937fL;
-
- for (int i = 0; i < key.length / 2; i++) {
- state._k1 = key[i * 2];
- state._k2 = key[i * 2 + 1];
-
- bmix(state);
- }
-
- long tail = key[key.length - 1];
-
- if (key.length % 2 != 0) {
- state._k1 ^= tail;
- bmix(state);
- }
-
- state._h2 ^= key.length * 8;
-
- state._h1 += state._h2;
- state._h2 += state._h1;
-
- state._h1 = fmix(state._h1);
- state._h2 = fmix(state._h2);
-
- state._h1 += state._h2;
- state._h2 += state._h1;
-
- return state._h1;
- }
-
- /**
- * Hash a value using the x64 32 bit variant of MurmurHash3
- *
- * @param key value to hash
- * @param seed random value
- * @return 32 bit hashed key
- */
- private int murmur3Hash32BitsX64(final long[] key, final int seed) {
- return (int) (murmur3Hash64BitsX64(key, seed) >>> 32);
- }
-
@VisibleForTesting
- int murmur3Hash32BitsX64(Object o, int seed) {
- if (o instanceof byte[]) {
- return murmur3Hash32BitsX64((byte[]) o, seed);
- } else if (o instanceof long[]) {
- return murmur3Hash32BitsX64((long[]) o, seed);
- } else if (o instanceof String) {
- return murmur3Hash32BitsX64((String) o, seed);
- } else {
- // Differing from the source implementation here. The default case in the source implementation is to apply the
- // hash on the hashcode of the object. The hashcode of an object is not guaranteed to be consistent across JVMs
- // (except for String values), so we cannot guarantee the same value as the data source. Since we cannot
- // guarantee similar values, we will instead apply the hash on the string representation of the object, which
- // aligns with the rest of our code base.
- return murmur3Hash32BitsX64(o.toString(), seed);
- }
- }
-
- private int murmur3Hash32BitsX64(String s, int seed) {
- return (int) (murmur3Hash32BitsX64String(s, seed) >> 32);
- }
-
- private long murmur3Hash32BitsX64String(String s, long seed) {
- // Exactly the same as MurmurHash3_x64_64, except it works directly on a String's chars
+ static int murmur3Hash32BitsX64(String s, int seed) {
State state = new State();
state._h1 = 0x9368e53c2f6af274L ^ seed;
@@ -370,13 +208,10 @@ public class Murmur3PartitionFunction implements PartitionFunction {
}
}
- // CHECKSTYLE:ON
long savedK1 = state._k1;
long savedK2 = state._k2;
state._k1 = 0;
state._k2 = 0;
-
- // CHECKSTYLE:OFF
switch (byteLen & 15) {
case 15:
state._k2 ^= (long) ((byte) (savedK2 >> 48)) << 48;
@@ -411,6 +246,7 @@ public class Murmur3PartitionFunction implements PartitionFunction {
bmix(state);
}
// CHECKSTYLE:ON
+
state._h2 ^= byteLen;
state._h1 += state._h2;
@@ -422,10 +258,10 @@ public class Murmur3PartitionFunction implements PartitionFunction {
state._h1 += state._h2;
state._h2 += state._h1;
- return state._h1;
+ return (int) (state._h1 >> 32);
}
- private void addByte(State state, byte b, int len) {
+ private static void addByte(State state, byte b, int len) {
int shift = (len & 0x7) * 8;
long bb = (b & 0xffL) << shift;
if ((len & 0x8) == 0) {
@@ -440,7 +276,7 @@ public class Murmur3PartitionFunction implements PartitionFunction {
}
}
- static class State {
+ private static class State {
long _h1;
long _h2;
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/MurmurPartitionFunction.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/MurmurPartitionFunction.java
index 7d2b9b8f31..a43a49f789 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/MurmurPartitionFunction.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/MurmurPartitionFunction.java
@@ -41,8 +41,8 @@ public class MurmurPartitionFunction implements PartitionFunction {
}
@Override
- public int getPartition(Object value) {
- return (murmur2(value.toString().getBytes(UTF_8)) & Integer.MAX_VALUE) % _numPartitions;
+ public int getPartition(String value) {
+ return (murmur2(value.getBytes(UTF_8)) & Integer.MAX_VALUE) % _numPartitions;
}
@Override
@@ -69,7 +69,7 @@ public class MurmurPartitionFunction implements PartitionFunction {
* @return 32 bit hash of the given array
*/
@VisibleForTesting
- int murmur2(final byte[] data) {
+ static int murmur2(final byte[] data) {
int length = data.length;
int seed = 0x9747b28c;
// 'm' and 'r' are mixing constants generated offline.
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunction.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunction.java
index e6404e48f2..3e6d26146d 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunction.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunction.java
@@ -34,11 +34,12 @@ public interface PartitionFunction extends Serializable {
/**
* Method to compute and return partition id for the given value.
+ * NOTE: The value is expected to be a string representation of the actual value.
*
* @param value Value for which to determine the partition id.
* @return partition id for the value.
*/
- int getPartition(Object value);
+ int getPartition(String value);
/**
* Returns the name of the partition function.
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java
index 13a60b424b..3f35c80cb4 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.spi.partition;
import java.util.HashMap;
import java.util.Map;
+import javax.annotation.Nullable;
/**
@@ -68,7 +69,7 @@ public class PartitionFunctionFactory {
// The PartitionFunctionFactory should be able to support these default implementations, as well as instantiate
// based on config
public static PartitionFunction getPartitionFunction(String functionName, int numPartitions,
- Map<String, String> functionConfig) {
+ @Nullable Map<String, String> functionConfig) {
PartitionFunctionType function = PartitionFunctionType.fromString(functionName);
switch (function) {
case Modulo:
diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java
index dec22a663d..a80f65d054 100644
--- a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java
+++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java
@@ -19,9 +19,7 @@
package org.apache.pinot.segment.spi.partition;
import com.fasterxml.jackson.databind.JsonNode;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -68,8 +66,6 @@ public class PartitionFunctionTest {
if (expectedPartition < 0) {
expectedPartition += numPartitions;
}
- assertEquals(partitionFunction.getPartition(value), expectedPartition);
- assertEquals(partitionFunction.getPartition((long) value), expectedPartition);
assertEquals(partitionFunction.getPartition(Integer.toString(value)), expectedPartition);
}
@@ -80,7 +76,6 @@ public class PartitionFunctionTest {
if (expectedPartition < 0) {
expectedPartition += numPartitions;
}
- assertEquals(partitionFunction.getPartition(value), expectedPartition);
assertEquals(partitionFunction.getPartition(Long.toString(value)), expectedPartition);
}
}
@@ -90,7 +85,6 @@ public class PartitionFunctionTest {
* Unit test for {@link MurmurPartitionFunction}.
* <ul>
* <li> Tests that partition values are in expected range. </li>
- * <li> Tests that toString returns expected string. </li>
* </ul>
*/
@Test
@@ -109,10 +103,7 @@ public class PartitionFunctionTest {
for (int j = 0; j < NUM_ROUNDS; j++) {
int value = j == 0 ? Integer.MIN_VALUE : random.nextInt();
- int partition1 = partitionFunction.getPartition(value);
- int partition2 = partitionFunction.getPartition(Integer.toString(value));
- assertEquals(partition1, partition2);
- assertTrue(partition1 >= 0 && partition1 < numPartitions);
+ testPartitionInExpectedRange(partitionFunction, value, numPartitions);
}
}
}
@@ -220,26 +211,26 @@ public class PartitionFunctionTest {
int value = j == 0 ? Integer.MIN_VALUE : random.nextInt();
// check for the partition function with functionConfig as null.
- testToStringAndPartitionNumber(partitionFunction1, value, numPartitions);
+ testPartitionInExpectedRange(partitionFunction1, value, numPartitions);
// check for the partition function with non-null functionConfig but without seed value.
- testToStringAndPartitionNumber(partitionFunction2, value, numPartitions);
+ testPartitionInExpectedRange(partitionFunction2, value, numPartitions);
// check for the partition function with non-null functionConfig and with seed value.
- testToStringAndPartitionNumber(partitionFunction3, value, numPartitions);
+ testPartitionInExpectedRange(partitionFunction3, value, numPartitions);
// check for the partition function with non-null functionConfig and with seed value and variant.
- testToStringAndPartitionNumber(partitionFunction4, value, numPartitions);
+ testPartitionInExpectedRange(partitionFunction4, value, numPartitions);
// check for the partition function with non-null functionConfig and with explicitly provided default seed
// value and variant.
- testToStringAndPartitionNumber(partitionFunction5, value, numPartitions);
+ testPartitionInExpectedRange(partitionFunction5, value, numPartitions);
// check for the partition function with non-null functionConfig and with empty seed value and default variant.
- testToStringAndPartitionNumber(partitionFunction6, value, numPartitions);
+ testPartitionInExpectedRange(partitionFunction6, value, numPartitions);
// check for the partition function with non-null functionConfig and with empty seed value and empty variant.
- testToStringAndPartitionNumber(partitionFunction7, value, numPartitions);
+ testPartitionInExpectedRange(partitionFunction7, value, numPartitions);
}
}
}
@@ -247,50 +238,18 @@ public class PartitionFunctionTest {
@Test
public void testMurmur3Equivalence() {
- // 10 values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
- // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 0 to those values and stored in
- // expectedMurmurValuesFor32BitX64WithZeroSeedForByteArray.
- int[] expectedMurmurValuesFor32BitX64WithZeroSeedForByteArray = new int[]{
- -1569442405, -921191038, 16439113, -881572510, 2111401876, 655879980, 1409856380, -1348364123, -1770645361,
- 1277101955
- };
-
- // 10 values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
- // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 9001 to those values and
- // stored in expectedMurmurValuesFor32BitX64WithNonZeroSeedForByteArray.
- int[] expectedMurmurValuesFor32BitX64WithNonZeroSeedForByteArray = new int[]{
- 698083240, 174075836, -938825597, 155806634, -831733828, 319389887, -939822329, -1785781936, -1796939240,
- 757512622
- };
-
- // 10 values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
- // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 0 to those values and stored in
- // expectedMurmurValuesFor32BitX64WithZeroSeedForLongArray.
- int[] expectedMurmurValuesFor32BitX64WithZeroSeedForLongArray = new int[]{
- -621156783, -1341356662, 1615513844, 1608247599, -1339558745, -1782606270, 1204009437, 8939246, -42073819,
- 1268621125
- };
-
- // 10 values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
- // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 9001 to those values and
- // stored in expectedMurmurValuesFor32BitX64WithNonZeroSeedForLongArray.
- int[] expectedMurmurValuesFor32BitX64WithNonZeroSeedForLongArray = new int[]{
- -159780599, 1266925141, -2039451704, 237323842, -1373894107, -1718192521, 314068498, 1377198162, 1239340429,
- -1643307044
- };
-
// 10 String values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
// Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_64_String with seed = 0, applied right shift
- // on 32 bits to those values and stored in expectedMurmurValuesFor32BitX64WithZeroSeedForString.
- int[] expectedMurmurValuesFor32BitX64WithZeroSeedForString = new int[]{
+ // on 32 bits to those values and stored in expectedMurmurValuesFor32BitX64WithZeroSeed.
+ int[] expectedMurmurValuesFor32BitX64WithZeroSeed = new int[]{
-930531654, 1010637996, -1251084035, -1551293561, 1591443335, 181872103, 1308755538, -432310401, -701537488,
674867586
};
// 10 String values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
// Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_64_String with seed = 0, applied right shift
- // on 32 bits those values and stored in expectedMurmurValuesFor32BitX64WithNonZeroSeedForString.
- int[] expectedMurmurValuesFor32BitX64WithNonZeroSeedForString = new int[]{
+ // on 32 bits those values and stored in expectedMurmurValuesFor32BitX64WithNonZeroSeed.
+ int[] expectedMurmurValuesFor32BitX64WithNonZeroSeed = new int[]{
1558697417, 933581816, 1071120824, 1964512897, 1629803052, 2037246152, -1867319466, -1003065762, -275998120,
1386652858
};
@@ -311,44 +270,23 @@ public class PartitionFunctionTest {
-19253806
};
- // Test for 32 bit murmur3 hash with x64_64 variant and seed = 0 for byte array.
- testMurmur3HashEquivalenceForDifferentDataTypes(0, expectedMurmurValuesFor32BitX64WithZeroSeedForByteArray,
- "byteArray", "x64_32");
-
- // Test for 32 bit murmur3 hash with x64_64 variant and seed = 9001 for byte array.
- testMurmur3HashEquivalenceForDifferentDataTypes(9001, expectedMurmurValuesFor32BitX64WithNonZeroSeedForByteArray,
- "byteArray", "x64_32");
-
- // Test for 32 bit murmur3 hash with x64_64 variant and seed = 0 for long array.
- testMurmur3HashEquivalenceForDifferentDataTypes(0, expectedMurmurValuesFor32BitX64WithZeroSeedForLongArray,
- "longArray", "x64_32");
-
- // Test for 32 bit murmur3 hash with x64_64 variant and seed = 9001 for long array.
- testMurmur3HashEquivalenceForDifferentDataTypes(9001, expectedMurmurValuesFor32BitX64WithNonZeroSeedForLongArray,
- "longArray", "x64_32");
-
// Test for 64 bit murmur3 hash with x64_64 variant and seed = 0 for String.
- testMurmur3HashEquivalenceForDifferentDataTypes(0, expectedMurmurValuesFor32BitX64WithZeroSeedForString, "String",
- "x64_32");
+ testMurmur3Hash(0, expectedMurmurValuesFor32BitX64WithZeroSeed, true);
// Test for 64 bit murmur3 hash with x64_64 variant and seed = 9001 for String.
- testMurmur3HashEquivalenceForDifferentDataTypes(9001, expectedMurmurValuesFor32BitX64WithNonZeroSeedForString,
- "String", "x64_32");
+ testMurmur3Hash(9001, expectedMurmurValuesFor32BitX64WithNonZeroSeed, true);
// Test for 32 bit murmur3 hash with x86_32 variant and seed = 0 for byte array.
- testMurmur3HashEquivalenceForDifferentDataTypes(0, expectedMurmurValuesFor32BitX86WithZeroSeed, "byteArray",
- "x86_32");
+ testMurmur3Hash(0, expectedMurmurValuesFor32BitX86WithZeroSeed, false);
// Test for 32 bit murmur3 hash with x86_32 variant and seed = 9001 for byte array.
- testMurmur3HashEquivalenceForDifferentDataTypes(9001, expectedMurmurValuesFor32BitX86WithNonZeroSeed, "byteArray",
- "x86_32");
+ testMurmur3Hash(9001, expectedMurmurValuesFor32BitX86WithNonZeroSeed, false);
}
/**
* Unit test for {@link MurmurPartitionFunction}.
* <ul>
* <li> Tests that partition values are in expected range. </li>
- * <li> Tests that toString returns expected string. </li>
* </ul>
*/
@Test
@@ -367,10 +305,7 @@ public class PartitionFunctionTest {
for (int j = 0; j < NUM_ROUNDS; j++) {
int value = j == 0 ? Integer.MIN_VALUE : random.nextInt();
- int partition1 = partitionFunction.getPartition(value);
- int partition2 = partitionFunction.getPartition(Integer.toString(value));
- assertEquals(partition1, partition2);
- assertTrue(partition1 >= 0 && partition1 < numPartitions);
+ testPartitionInExpectedRange(partitionFunction, value, numPartitions);
}
}
}
@@ -394,7 +329,6 @@ public class PartitionFunctionTest {
int value = j == 0 ? Integer.MIN_VALUE : random.nextInt();
int hashCode = Integer.toString(value).hashCode();
int expectedPartition = ((hashCode == Integer.MIN_VALUE) ? 0 : Math.abs(hashCode)) % numPartitions;
- assertEquals(partitionFunction.getPartition(value), expectedPartition);
assertEquals(partitionFunction.getPartition(Integer.toString(value)), expectedPartition);
}
@@ -403,7 +337,6 @@ public class PartitionFunctionTest {
double value = j == 0 ? Double.NEGATIVE_INFINITY : random.nextDouble();
int hashCode = Double.toString(value).hashCode();
int expectedPartition = ((hashCode == Integer.MIN_VALUE) ? 0 : Math.abs(hashCode)) % numPartitions;
- assertEquals(partitionFunction.getPartition(value), expectedPartition);
assertEquals(partitionFunction.getPartition(Double.toString(value)), expectedPartition);
}
}
@@ -467,17 +400,13 @@ public class PartitionFunctionTest {
long seed = 100;
Random random = new Random(seed);
- int numPartitions = 5;
- MurmurPartitionFunction murmurPartitionFunction = new MurmurPartitionFunction(numPartitions);
-
// Generate the same values as above - 10 random values of size 7, using {@link Random::nextBytes} with seed 100
// Apply {@link MurmurPartitionFunction::murmur2
// compare with stored results
- byte[] array = new byte[7];
+ byte[] bytes = new byte[7];
for (int expectedMurmurValue : expectedMurmurValues) {
- random.nextBytes(array);
- int actualMurmurValue = murmurPartitionFunction.murmur2(array);
- assertEquals(actualMurmurValue, expectedMurmurValue);
+ random.nextBytes(bytes);
+ assertEquals(MurmurPartitionFunction.murmur2(bytes), expectedMurmurValue);
}
}
@@ -501,7 +430,7 @@ public class PartitionFunctionTest {
// generate the same 10 String values
// Apply the partition function and compare with stored results
- testPartitionFunctionEquivalence(murmurPartitionFunction, expectedPartitions);
+ testPartitionFunction(murmurPartitionFunction, expectedPartitions);
}
@Test
@@ -510,31 +439,17 @@ public class PartitionFunctionTest {
// 10 String values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
// Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 0 to those values and stored in
// expectedMurmurValuesFor32BitX64WithZeroSeed.
- int[] expectedPartitions32BitsX64WithZeroSeedForByteArrayAndString = new int[]{
+ int[] expectedPartitions32BitsX64WithZeroSeed = new int[]{
4, 1, 3, 2, 0, 3, 3, 2, 0, 1
};
// 10 String values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
// Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 9001 to those values and
// stored in expectedMurmurValuesFor32BitX64WithZeroSeed.
- int[] expectedPartitions32BitsX64WithNonZeroSeedForByteArrayAndString = new int[]{
+ int[] expectedPartitions32BitsX64WithNonZeroSeed = new int[]{
2, 1, 4, 2, 2, 2, 2, 1, 3, 3
};
- // 10 long[] values of size 10, were randomly generated, using {@link Random::nextLong} with seed 100
- // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 0 to those values and stored in
- // expectedPartitions32BitsX64WithZeroSeedForLongArray.
- int[] expectedPartitions32BitsX64WithZeroSeedForLongArray = new int[]{
- 0, 1, 4, 4, 3, 3, 2, 1, 4, 0
- };
-
- // 10 long[] values of size 10, were randomly generated, using {@link Random::nextLong} with seed 100
- // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 0 to those values and stored in
- // expectedPartitions32BitsX64WithNonZeroSeedForLongArray.
- int[] expectedPartitions32BitsX64WithNonZeroSeedForLongArray = new int[]{
- 4, 1, 4, 2, 1, 2, 3, 2, 4, 4
- };
-
// 10 String values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
// Applied com.google.common.hash.hashing::murmur3_32_fixed with seed = 0 to those values and stored in
// expectedMurmurValuesFor32BitX64WithZeroSeed.
@@ -575,22 +490,13 @@ public class PartitionFunctionTest {
// Generate the same 10 String values. Test if the calculated values are equal for both String and byte[] (they
// should be equal when String is converted to byte[]) and if the values are equal to the expected values for the
// x64_32 variant with seed = 0 and x64_32 variant with seed = 9001.
- testPartitionFunctionEquivalenceWithStringAndByteArray(murmur3PartitionFunction1,
- expectedPartitions32BitsX64WithZeroSeedForByteArrayAndString);
- testPartitionFunctionEquivalenceWithStringAndByteArray(murmur3PartitionFunction2,
- expectedPartitions32BitsX64WithNonZeroSeedForByteArrayAndString);
-
- // Generate the same 10 long[] values. Test if the calculated values are equal to the expected values for the x64_32
- // variant with seed = 0 and x64_32 variant with seed = 9001.
- testPartitionFunctionEquivalenceWithLongArray(murmur3PartitionFunction1,
- expectedPartitions32BitsX64WithZeroSeedForLongArray);
- testPartitionFunctionEquivalenceWithLongArray(murmur3PartitionFunction2,
- expectedPartitions32BitsX64WithNonZeroSeedForLongArray);
+ testPartitionFunction(murmur3PartitionFunction1, expectedPartitions32BitsX64WithZeroSeed);
+ testPartitionFunction(murmur3PartitionFunction2, expectedPartitions32BitsX64WithNonZeroSeed);
// Generate the same 10 String values. Test if the calculated values are equal to the expected values for the x86_32
// variant with seed = 0 and x86_32 variant with seed = 9001.
- testPartitionFunctionEquivalence(murmur3PartitionFunction3, expectedPartitions32BitsX86WithZeroSeed);
- testPartitionFunctionEquivalence(murmur3PartitionFunction4, expectedPartitions32BitsX86WithNonZeroSeed);
+ testPartitionFunction(murmur3PartitionFunction3, expectedPartitions32BitsX86WithZeroSeed);
+ testPartitionFunction(murmur3PartitionFunction4, expectedPartitions32BitsX86WithNonZeroSeed);
}
/**
@@ -609,132 +515,43 @@ public class PartitionFunctionTest {
// generate the same 10 String values
// Apply the partition function and compare with stored results
- testPartitionFunctionEquivalence(byteArrayPartitionFunction, expectedPartitions);
+ testPartitionFunction(byteArrayPartitionFunction, expectedPartitions);
}
- private void testPartitionFunctionEquivalence(PartitionFunction partitionFunction, int[] expectedPartitions) {
+ private void testPartitionInExpectedRange(PartitionFunction partitionFunction, Object value, int numPartitions) {
+ int partition = partitionFunction.getPartition(value.toString());
+ assertTrue(partition >= 0 && partition < numPartitions);
+ }
+
+ private void testPartitionFunction(PartitionFunction partitionFunction, int[] expectedPartitions) {
long seed = 100;
Random random = new Random(seed);
// Generate 10 random String values of size 7, using {@link Random::nextBytes} with seed 100
// Apply given partition function
// compare with expectedPartitions
- byte[] array = new byte[7];
- for (int expectedPartition : expectedPartitions) {
- random.nextBytes(array);
- String nextString = new String(array, UTF_8);
- int actualPartition = partitionFunction.getPartition(nextString);
- assertEquals(actualPartition, expectedPartition);
+ byte[] bytes = new byte[7];
+ for (int expectedPartitionNumber : expectedPartitions) {
+ random.nextBytes(bytes);
+ String nextString = new String(bytes, UTF_8);
+ assertEquals(partitionFunction.getPartition(nextString), expectedPartitionNumber);
}
}
- private void testPartitionFunctionEquivalenceWithStringAndByteArray(PartitionFunction partitionFunction,
- int[] expectedPartitions) {
+ private void testMurmur3Hash(int hashSeed, int[] expectedHashValues, boolean useX64) {
long seed = 100;
Random random = new Random(seed);
// Generate 10 random String values of size 7, using {@link Random::nextBytes} with seed 100
// Apply given partition function
// compare with expectedPartitions
- byte[] array = new byte[7];
- for (int expectedPartitionNumber : expectedPartitions) {
- random.nextBytes(array);
- String nextString = new String(array, UTF_8);
- int actualPartitionNumberFromString = partitionFunction.getPartition(nextString);
- int actualPartitionNumberFromByteArray = partitionFunction.getPartition(nextString.getBytes(UTF_8));
- assertEquals(actualPartitionNumberFromString, actualPartitionNumberFromByteArray);
- assertEquals(actualPartitionNumberFromString, expectedPartitionNumber);
- }
- }
-
- private void testPartitionFunctionEquivalenceWithLongArray(PartitionFunction partitionFunction,
- int[] expectedPartitions) {
- int seed = 100;
- Random random = new Random(seed);
-
- // Create a list of 10 long[] values using ArrayList, each of size 10.
- List<long[]> longList = new ArrayList<>();
-
- for (int i = 0; i < 10; i++) {
- long[] longArray = new long[10];
- for (int j = 0; j < 10; j++) {
- longArray[j] = random.nextLong();
- }
- longList.add(longArray);
+ byte[] bytes = new byte[7];
+ for (int expectedHashValue : expectedHashValues) {
+ random.nextBytes(bytes);
+ String nextString = new String(bytes, UTF_8);
+ int actualHashValue = useX64 ? Murmur3PartitionFunction.murmur3Hash32BitsX64(nextString, hashSeed)
+ : Murmur3PartitionFunction.murmur3Hash32BitsX86(bytes, hashSeed);
+ assertEquals(actualHashValue, expectedHashValue);
}
-
- // Apply the partition function and compare with expected values.
- for (int i = 0; i < 10; i++) {
- int actualPartitionNumberFromLongArray = partitionFunction.getPartition(longList.get(i));
- assertEquals(actualPartitionNumberFromLongArray, expectedPartitions[i]);
- }
- }
-
- private void testMurmur3HashEquivalenceForDifferentDataTypes(int hashSeed, int[] expectedHashValues, String dataType,
- String variant) {
- long seed = 100;
- Random random;
- int numPartitions = 5;
- Murmur3PartitionFunction murmur3PartitionFunction = new Murmur3PartitionFunction(numPartitions, null);
-
- switch (dataType.toLowerCase()) {
- case "string":
- // Generate 10 random String values of size 7, using {@link Random::nextBytes} with seed 100
- // Apply given partition function
- // compare with expectedPartitions
- random = new Random(seed);
- byte[] array1 = new byte[7];
- for (int expectedHashValue : expectedHashValues) {
- random.nextBytes(array1);
- String nextString = new String(array1, UTF_8);
- int actualHashValueFromString = murmur3PartitionFunction.murmur3Hash32BitsX64(nextString, hashSeed);
- assertEquals(actualHashValueFromString, expectedHashValue);
- }
- break;
- case "bytearray":
- // Generate 10 random String values of size 7, using {@link Random::nextBytes} with seed 100
- // Apply given partition function
- // compare with expectedPartitions
- random = new Random(seed);
- int actualHashValueFromByteArray;
- byte[] array2 = new byte[7];
- for (int expectedHashValue : expectedHashValues) {
- random.nextBytes(array2);
- if (variant.equals("x64_32")) {
- actualHashValueFromByteArray = murmur3PartitionFunction.murmur3Hash32BitsX64(array2, hashSeed);
- } else {
- actualHashValueFromByteArray = murmur3PartitionFunction.murmur3Hash32BitsX86(array2, hashSeed);
- }
- assertEquals(actualHashValueFromByteArray, expectedHashValue);
- }
- break;
- case "longarray":
- random = new Random(seed);
- // Create a list of 10 long[] values using ArrayList, each of size 10.
- List<long[]> longList = new ArrayList<>();
-
- for (int i = 0; i < 10; i++) {
- long[] longArray = new long[10];
- for (int j = 0; j < 10; j++) {
- longArray[j] = random.nextLong();
- }
- longList.add(longArray);
- }
-
- // Apply the partition function and compare with expected values.
- for (int i = 0; i < 10; i++) {
- int actualHashValueFromLongArray = murmur3PartitionFunction.murmur3Hash32BitsX64(longList.get(i), hashSeed);
- assertEquals(actualHashValueFromLongArray, expectedHashValues[i]);
- }
- break;
- default:
- }
- }
- private void testToStringAndPartitionNumber(PartitionFunction partitionFunction, int testValueForGetPartition,
- int numPartitions) {
- int partition1 = partitionFunction.getPartition(testValueForGetPartition);
- int partition2 = partitionFunction.getPartition(Integer.toString(testValueForGetPartition));
- assertEquals(partition1, partition2);
- assertTrue(partition1 >= 0 && partition1 < numPartitions);
}
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
index a3dc1bbef5..e1218452d3 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
@@ -207,12 +207,14 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable {
* @param value Value for which String value needs to be returned
* @return String value for the object.
*/
- protected static String getStringValue(Object value) {
+ public static String getStringValue(Object value) {
+ if (value instanceof BigDecimal) {
+ return ((BigDecimal) value).toPlainString();
+ }
if (value instanceof byte[]) {
return BytesUtils.toHexString((byte[]) value);
- } else {
- return value.toString();
}
+ return value.toString();
}
// Required by JSON de-serializer. DO NOT REMOVE.
@@ -562,6 +564,19 @@ public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable {
}
}
+ /**
+ * Converts the given value of the data type to string.The input value for BYTES type should be byte[].
+ */
+ public String toString(Object value) {
+ if (this == BIG_DECIMAL) {
+ return ((BigDecimal) value).toPlainString();
+ }
+ if (this == BYTES) {
+ return BytesUtils.toHexString((byte[]) value);
+ }
+ return value.toString();
+ }
+
/**
* Converts the given string value to the data type. Returns ByteArray for BYTES.
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org