You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/12/05 22:59:35 UTC
(pinot) branch master updated: Add support for murmur3 as a partition function (#12049)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 49d0ff01a4 Add support for murmur3 as a partition function (#12049)
49d0ff01a4 is described below
commit 49d0ff01a4bce5a9fee3b15d40b0eb803fc11b64
Author: aishikbh <ai...@startree.ai>
AuthorDate: Wed Dec 6 04:29:30 2023 +0530
Add support for murmur3 as a partition function (#12049)
---
.../spi/partition/Murmur3PartitionFunction.java | 453 +++++++++++++++++++++
.../spi/partition/PartitionFunctionFactory.java | 5 +-
.../spi/partition/PartitionFunctionTest.java | 428 +++++++++++++++++++
3 files changed, 885 insertions(+), 1 deletion(-)
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java
new file mode 100644
index 0000000000..d66eeca1ae
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java
@@ -0,0 +1,453 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.partition;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.hash.Hashing;
+import java.util.Map;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+
+/**
+ * Implementation of {@link PartitionFunction} which partitions based on 32 bit murmur3 hash
+ */
+public class Murmur3PartitionFunction implements PartitionFunction {
+ public static final byte INVALID_CHAR = (byte) '?';
+ private static final String NAME = "Murmur3";
+ private static final String SEED_KEY = "seed";
+ private static final String MURMUR3_VARIANT = "variant";
+ private final int _numPartitions;
+ private final int _hashSeed;
+ private final String _variant;
+
+ /**
+ * Constructor for the class.
+ * @param numPartitions Number of partitions.
+ * @param functionConfig to extract configurations for the partition function.
+ */
+ public Murmur3PartitionFunction(int numPartitions, Map<String, String> functionConfig) {
+ Preconditions.checkArgument(numPartitions > 0, "Number of partitions must be > 0");
+ Preconditions.checkArgument(
+ functionConfig == null || functionConfig.get(MURMUR3_VARIANT) == null || functionConfig.get(MURMUR3_VARIANT)
+ .isEmpty() || functionConfig.get(MURMUR3_VARIANT).equals("x86_32") || functionConfig.get(MURMUR3_VARIANT)
+ .equals("x64_32"), "Murmur3 variant must be either x86_32 or x64_32");
+ _numPartitions = numPartitions;
+
+ // default value of the hash seed is 0.
+ _hashSeed =
+ (functionConfig == null || functionConfig.get(SEED_KEY) == null || functionConfig.get(SEED_KEY).isEmpty()) ? 0
+ : Integer.parseInt(functionConfig.get(SEED_KEY));
+
+ // default value of the murmur3 variant is x86_32.
+ _variant =
+ (functionConfig == null || functionConfig.get(MURMUR3_VARIANT) == null || functionConfig.get(MURMUR3_VARIANT)
+ .isEmpty()) ? "x86_32" : functionConfig.get(MURMUR3_VARIANT);
+ }
+
+ @Override
+ public int getPartition(Object value) {
+ if (_variant.equals("x86_32")) {
+ return (murmur3Hash32BitsX86(value.toString().getBytes(UTF_8), _hashSeed) & Integer.MAX_VALUE) % _numPartitions;
+ }
+ return (murmur3Hash32BitsX64(value, _hashSeed) & Integer.MAX_VALUE) % _numPartitions;
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+ @Override
+ public int getNumPartitions() {
+ return _numPartitions;
+ }
+
+ // Keep it for backward-compatibility, use getName() instead
+ @Override
+ public String toString() {
+ return NAME;
+ }
+
+ @VisibleForTesting
+ int murmur3Hash32BitsX86(byte[] data, int hashSeed) {
+ return Hashing.murmur3_32_fixed(hashSeed).hashBytes(data).asInt();
+ }
+
+ /**
+ * Taken from <a href=
+ * "https://github.com/infinispan/infinispan/blob/main/commons/all/src/main/java/org/infinispan/commons/hash
+ * /MurmurHash3.java"
+ * >Infinispan code base</a>.
+ *
+ * MurmurHash3 implementation in Java, based on Austin Appleby's <a href=
+ * "https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp"
+ * >original in C</a>
+ *
+ * This is an implementation of MurmurHash3 to generate 32 bit hash for x64 architecture (not part of the original
+ * Murmur3 implementations) used by Infinispan and Debezium, Removed the parts that we don't need and formatted
+ * the code to Apache Pinot's Checkstyle.
+ *
+ * @author Patrick McFarland
+ * @see <a href="http://sites.google.com/site/murmurhash/">MurmurHash website</a>
+ * @see <a href="http://en.wikipedia.org/wiki/MurmurHash">MurmurHash entry on Wikipedia</a>
+ */
+
+ private long getblock(byte[] key, int i) {
+ return ((key[i + 0] & 0x00000000000000FFL)) | ((key[i + 1] & 0x00000000000000FFL) << 8) | (
+ (key[i + 2] & 0x00000000000000FFL) << 16) | ((key[i + 3] & 0x00000000000000FFL) << 24) | (
+ (key[i + 4] & 0x00000000000000FFL) << 32) | ((key[i + 5] & 0x00000000000000FFL) << 40) | (
+ (key[i + 6] & 0x00000000000000FFL) << 48) | ((key[i + 7] & 0x00000000000000FFL) << 56);
+ }
+
+ private void bmix(State state) {
+ state._k1 *= state._c1;
+ state._k1 = (state._k1 << 23) | (state._k1 >>> 64 - 23);
+ state._k1 *= state._c2;
+ state._h1 ^= state._k1;
+ state._h1 += state._h2;
+
+ state._h2 = (state._h2 << 41) | (state._h2 >>> 64 - 41);
+
+ state._k2 *= state._c2;
+ state._k2 = (state._k2 << 23) | (state._k2 >>> 64 - 23);
+ state._k2 *= state._c1;
+ state._h2 ^= state._k2;
+ state._h2 += state._h1;
+
+ state._h1 = state._h1 * 3 + 0x52dce729;
+ state._h2 = state._h2 * 3 + 0x38495ab5;
+
+ state._c1 = state._c1 * 5 + 0x7b7d159c;
+ state._c2 = state._c2 * 5 + 0x6bce6396;
+ }
+
+ private long fmix(long k) {
+ k ^= k >>> 33;
+ k *= 0xff51afd7ed558ccdL;
+ k ^= k >>> 33;
+ k *= 0xc4ceb9fe1a85ec53L;
+ k ^= k >>> 33;
+
+ return k;
+ }
+
+ /**
+ * Hash a value using the x64 64 bit variant of MurmurHash3
+ *
+ * @param key value to hash
+ * @param seed random value
+ * @return 64 bit hashed key
+ */
+ private long murmur3Hash64BitsX64(final byte[] key, final int seed) {
+ State state = new State();
+
+ state._h1 = 0x9368e53c2f6af274L ^ seed;
+ state._h2 = 0x586dcd208f7cd3fdL ^ seed;
+
+ state._c1 = 0x87c37b91114253d5L;
+ state._c2 = 0x4cf5ad432745937fL;
+
+ for (int i = 0; i < key.length / 16; i++) {
+ state._k1 = getblock(key, i * 2 * 8);
+ state._k2 = getblock(key, (i * 2 + 1) * 8);
+
+ bmix(state);
+ }
+
+ state._k1 = 0;
+ state._k2 = 0;
+
+ int tail = (key.length >>> 4) << 4;
+
+ // CHECKSTYLE:OFF
+ switch (key.length & 15) {
+ case 15:
+ state._k2 ^= (long) key[tail + 14] << 48;
+ case 14:
+ state._k2 ^= (long) key[tail + 13] << 40;
+ case 13:
+ state._k2 ^= (long) key[tail + 12] << 32;
+ case 12:
+ state._k2 ^= (long) key[tail + 11] << 24;
+ case 11:
+ state._k2 ^= (long) key[tail + 10] << 16;
+ case 10:
+ state._k2 ^= (long) key[tail + 9] << 8;
+ case 9:
+ state._k2 ^= key[tail + 8];
+ case 8:
+ state._k1 ^= (long) key[tail + 7] << 56;
+ case 7:
+ state._k1 ^= (long) key[tail + 6] << 48;
+ case 6:
+ state._k1 ^= (long) key[tail + 5] << 40;
+ case 5:
+ state._k1 ^= (long) key[tail + 4] << 32;
+ case 4:
+ state._k1 ^= (long) key[tail + 3] << 24;
+ case 3:
+ state._k1 ^= (long) key[tail + 2] << 16;
+ case 2:
+ state._k1 ^= (long) key[tail + 1] << 8;
+ case 1:
+ state._k1 ^= key[tail + 0];
+ bmix(state);
+ }
+
+ // CHECKSTYLE:ON
+ state._h2 ^= key.length;
+
+ state._h1 += state._h2;
+ state._h2 += state._h1;
+
+ state._h1 = fmix(state._h1);
+ state._h2 = fmix(state._h2);
+
+ state._h1 += state._h2;
+ state._h2 += state._h1;
+
+ return state._h1;
+ }
+
+ /**
+ * Hash a value using the x64 32 bit variant of MurmurHash3
+ *
+ * @param key value to hash
+ * @param seed random value
+ * @return 32 bit hashed key
+ */
+ private int murmur3Hash32BitsX64(final byte[] key, final int seed) {
+ return (int) (murmur3Hash64BitsX64(key, seed) >>> 32);
+ }
+
+ private long murmur3Hash64BitsX64(final long[] key, final int seed) {
+ // Exactly the same as MurmurHash3_x64_128, except it only returns state.h1
+ State state = new State();
+
+ state._h1 = 0x9368e53c2f6af274L ^ seed;
+ state._h2 = 0x586dcd208f7cd3fdL ^ seed;
+
+ state._c1 = 0x87c37b91114253d5L;
+ state._c2 = 0x4cf5ad432745937fL;
+
+ for (int i = 0; i < key.length / 2; i++) {
+ state._k1 = key[i * 2];
+ state._k2 = key[i * 2 + 1];
+
+ bmix(state);
+ }
+
+ long tail = key[key.length - 1];
+
+ if (key.length % 2 != 0) {
+ state._k1 ^= tail;
+ bmix(state);
+ }
+
+ state._h2 ^= key.length * 8;
+
+ state._h1 += state._h2;
+ state._h2 += state._h1;
+
+ state._h1 = fmix(state._h1);
+ state._h2 = fmix(state._h2);
+
+ state._h1 += state._h2;
+ state._h2 += state._h1;
+
+ return state._h1;
+ }
+
+ /**
+ * Hash a value using the x64 32 bit variant of MurmurHash3
+ *
+ * @param key value to hash
+ * @param seed random value
+ * @return 32 bit hashed key
+ */
+ private int murmur3Hash32BitsX64(final long[] key, final int seed) {
+ return (int) (murmur3Hash64BitsX64(key, seed) >>> 32);
+ }
+
+ @VisibleForTesting
+ int murmur3Hash32BitsX64(Object o, int seed) {
+ if (o instanceof byte[]) {
+ return murmur3Hash32BitsX64((byte[]) o, seed);
+ } else if (o instanceof long[]) {
+ return murmur3Hash32BitsX64((long[]) o, seed);
+ } else if (o instanceof String) {
+ return murmur3Hash32BitsX64((String) o, seed);
+ } else {
+ // Differing from the source implementation here. The default case in the source implementation is to apply the
+ // hash on the hashcode of the object. The hashcode of an object is not guaranteed to be consistent across JVMs
+ // (except for String values), so we cannot guarantee the same value as the data source. Since we cannot
+ // guarantee similar values, we will instead apply the hash on the string representation of the object, which
+ // aligns with the rest of our code base.
+ return murmur3Hash32BitsX64(o.toString(), seed);
+ }
+ }
+
+ private int murmur3Hash32BitsX64(String s, int seed) {
+ return (int) (murmur3Hash32BitsX64String(s, seed) >> 32);
+ }
+
+ private long murmur3Hash32BitsX64String(String s, long seed) {
+ // Exactly the same as MurmurHash3_x64_64, except it works directly on a String's chars
+ State state = new State();
+
+ state._h1 = 0x9368e53c2f6af274L ^ seed;
+ state._h2 = 0x586dcd208f7cd3fdL ^ seed;
+
+ state._c1 = 0x87c37b91114253d5L;
+ state._c2 = 0x4cf5ad432745937fL;
+
+ int byteLen = 0;
+ int stringLen = s.length();
+
+ // CHECKSTYLE:OFF
+ for (int i = 0; i < stringLen; i++) {
+ char c1 = s.charAt(i);
+ int cp;
+ if (!Character.isSurrogate(c1)) {
+ cp = c1;
+ } else if (Character.isHighSurrogate(c1)) {
+ if (i + 1 < stringLen) {
+ char c2 = s.charAt(i + 1);
+ if (Character.isLowSurrogate(c2)) {
+ i++;
+ cp = Character.toCodePoint(c1, c2);
+ } else {
+ cp = INVALID_CHAR;
+ }
+ } else {
+ cp = INVALID_CHAR;
+ }
+ } else {
+ cp = INVALID_CHAR;
+ }
+
+ if (cp <= 0x7f) {
+ addByte(state, (byte) cp, byteLen++);
+ } else if (cp <= 0x07ff) {
+ byte b1 = (byte) (0xc0 | (0x1f & (cp >> 6)));
+ byte b2 = (byte) (0x80 | (0x3f & cp));
+ addByte(state, b1, byteLen++);
+ addByte(state, b2, byteLen++);
+ } else if (cp <= 0xffff) {
+ byte b1 = (byte) (0xe0 | (0x0f & (cp >> 12)));
+ byte b2 = (byte) (0x80 | (0x3f & (cp >> 6)));
+ byte b3 = (byte) (0x80 | (0x3f & cp));
+ addByte(state, b1, byteLen++);
+ addByte(state, b2, byteLen++);
+ addByte(state, b3, byteLen++);
+ } else {
+ byte b1 = (byte) (0xf0 | (0x07 & (cp >> 18)));
+ byte b2 = (byte) (0x80 | (0x3f & (cp >> 12)));
+ byte b3 = (byte) (0x80 | (0x3f & (cp >> 6)));
+ byte b4 = (byte) (0x80 | (0x3f & cp));
+ addByte(state, b1, byteLen++);
+ addByte(state, b2, byteLen++);
+ addByte(state, b3, byteLen++);
+ addByte(state, b4, byteLen++);
+ }
+ }
+
+ // CHECKSTYLE:ON
+ long savedK1 = state._k1;
+ long savedK2 = state._k2;
+ state._k1 = 0;
+ state._k2 = 0;
+
+ // CHECKSTYLE:OFF
+ switch (byteLen & 15) {
+ case 15:
+ state._k2 ^= (long) ((byte) (savedK2 >> 48)) << 48;
+ case 14:
+ state._k2 ^= (long) ((byte) (savedK2 >> 40)) << 40;
+ case 13:
+ state._k2 ^= (long) ((byte) (savedK2 >> 32)) << 32;
+ case 12:
+ state._k2 ^= (long) ((byte) (savedK2 >> 24)) << 24;
+ case 11:
+ state._k2 ^= (long) ((byte) (savedK2 >> 16)) << 16;
+ case 10:
+ state._k2 ^= (long) ((byte) (savedK2 >> 8)) << 8;
+ case 9:
+ state._k2 ^= ((byte) savedK2);
+ case 8:
+ state._k1 ^= (long) ((byte) (savedK1 >> 56)) << 56;
+ case 7:
+ state._k1 ^= (long) ((byte) (savedK1 >> 48)) << 48;
+ case 6:
+ state._k1 ^= (long) ((byte) (savedK1 >> 40)) << 40;
+ case 5:
+ state._k1 ^= (long) ((byte) (savedK1 >> 32)) << 32;
+ case 4:
+ state._k1 ^= (long) ((byte) (savedK1 >> 24)) << 24;
+ case 3:
+ state._k1 ^= (long) ((byte) (savedK1 >> 16)) << 16;
+ case 2:
+ state._k1 ^= (long) ((byte) (savedK1 >> 8)) << 8;
+ case 1:
+ state._k1 ^= ((byte) savedK1);
+ bmix(state);
+ }
+ // CHECKSTYLE:ON
+ state._h2 ^= byteLen;
+
+ state._h1 += state._h2;
+ state._h2 += state._h1;
+
+ state._h1 = fmix(state._h1);
+ state._h2 = fmix(state._h2);
+
+ state._h1 += state._h2;
+ state._h2 += state._h1;
+
+ return state._h1;
+ }
+
+ private void addByte(State state, byte b, int len) {
+ int shift = (len & 0x7) * 8;
+ long bb = (b & 0xffL) << shift;
+ if ((len & 0x8) == 0) {
+ state._k1 |= bb;
+ } else {
+ state._k2 |= bb;
+ if ((len & 0xf) == 0xf) {
+ bmix(state);
+ state._k1 = 0;
+ state._k2 = 0;
+ }
+ }
+ }
+
+ static class State {
+ long _h1;
+ long _h2;
+
+ long _k1;
+ long _k2;
+
+ long _c1;
+ long _c2;
+ }
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java
index 77bfc4c08d..13a60b424b 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java
@@ -28,7 +28,7 @@ import java.util.Map;
public class PartitionFunctionFactory {
// Enum for various partition functions to be added.
public enum PartitionFunctionType {
- Modulo, Murmur, ByteArray, HashCode, BoundedColumnValue;
+ Modulo, Murmur, Murmur3, ByteArray, HashCode, BoundedColumnValue;
// Add more functions here.
private static final Map<String, PartitionFunctionType> VALUE_MAP = new HashMap<>();
@@ -77,6 +77,9 @@ public class PartitionFunctionFactory {
case Murmur:
return new MurmurPartitionFunction(numPartitions);
+ case Murmur3:
+ return new Murmur3PartitionFunction(numPartitions, functionConfig);
+
case ByteArray:
return new ByteArrayPartitionFunction(numPartitions);
diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java
index 924cc87ffc..dec22a663d 100644
--- a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java
+++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java
@@ -19,7 +19,9 @@
package org.apache.pinot.segment.spi.partition;
import com.fasterxml.jackson.databind.JsonNode;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -115,6 +117,233 @@ public class PartitionFunctionTest {
}
}
+ /**
+ * Unit test for {@link Murmur3PartitionFunction}.
+ * <ul>
+ * <li> Tests that partition values are in expected range. </li>
+ * <li> Tests that toString returns expected string. </li>
+ * <li> Tests the default behaviors when functionConfig is not provided or only one of the optional parameters of
+ * functionConfig is provided.</li>
+ * </ul>
+ */
+ @Test
+ public void testMurmur3Partitioner() {
+ long seed = System.currentTimeMillis();
+ Random random = new Random(seed);
+
+ for (int i = 0; i < NUM_ROUNDS; i++) {
+ int numPartitions = random.nextInt(MAX_NUM_PARTITIONS) + 1;
+
+ String functionName = "MurMUr3";
+ String valueTobeHashed = String.valueOf(random.nextInt());
+ Map<String, String> functionConfig = new HashMap<>();
+
+ // Create partition function with function config as null.
+ PartitionFunction partitionFunction1 =
+ PartitionFunctionFactory.getPartitionFunction(functionName, numPartitions, null);
+
+ // Check getName and toString equivalence.
+ assertEquals(partitionFunction1.getName(), partitionFunction1.toString());
+
+ // Get partition number with random value.
+ int partitionNumWithNullConfig = partitionFunction1.getPartition(valueTobeHashed);
+
+ // Create partition function with function config present but no seed value present.
+ PartitionFunction partitionFunction2 =
+ PartitionFunctionFactory.getPartitionFunction(functionName, numPartitions, functionConfig);
+
+ // Get partition number with random value.
+ int partitionNumWithNoSeedValue = partitionFunction2.getPartition(valueTobeHashed);
+
+ // The partition number with null function config and function config with empty seed value should be equal.
+ assertEquals(partitionNumWithNullConfig, partitionNumWithNoSeedValue);
+
+ // Put random seed value in "seed" field in the function config.
+ functionConfig.put("seed", Integer.toString(random.nextInt()));
+
+ // Create partition function with function config present but random seed value present in function config.
+ PartitionFunction partitionFunction3 =
+ PartitionFunctionFactory.getPartitionFunction(functionName, numPartitions, functionConfig);
+
+ // Create partition function with function config present with random seed value
+ // and with variant provided as "x64_32" in function config.
+ functionConfig.put("variant", "x64_32");
+ PartitionFunction partitionFunction4 =
+ PartitionFunctionFactory.getPartitionFunction(functionName, numPartitions, functionConfig);
+
+ // Put variant value as "x86_32" in function config.
+ functionConfig.put("variant", "x86_32");
+
+ // Put seed value as 0 in function config.
+ functionConfig.put("seed", "0");
+
+ // Create partition function with function config present with variant provided as "x86_32" in function config.
+ PartitionFunction partitionFunction5 =
+ PartitionFunctionFactory.getPartitionFunction(functionName, numPartitions, functionConfig);
+
+ // Partition number should be equal as partitionNumWithNullConfig and partitionNumWithNoSeedValue as this is
+ // default behavior.
+ assertEquals(partitionFunction5.getPartition(valueTobeHashed), partitionNumWithNullConfig);
+
+ // Replace seed value as empty string in function config.
+ functionConfig.put("seed", "");
+
+ // Create partition function with function config present with variant provided as "x86_32" and empty seed
+ // value in functionConfig.
+ PartitionFunction partitionFunction6 =
+ PartitionFunctionFactory.getPartitionFunction(functionName, numPartitions, functionConfig);
+
+ // Partition number should be equal as partitionNumWithNullConfig and partitionNumWithNoSeedValue as this is
+ // default behavior.
+ assertEquals(partitionFunction6.getPartition(valueTobeHashed), partitionNumWithNullConfig);
+
+ // Replace variant value as empty string in function config.
+ functionConfig.put("variant", "");
+
+ // Create partition function with function config present with empty variant and empty seed.
+ PartitionFunction partitionFunction7 =
+ PartitionFunctionFactory.getPartitionFunction(functionName, numPartitions, functionConfig);
+
+ // Partition number should be equal as partitionNumWithNullConfig and partitionNumWithNoSeedValue as this is
+ // default behavior.
+ assertEquals(partitionFunction7.getPartition(valueTobeHashed), partitionNumWithNullConfig);
+
+ testBasicProperties(partitionFunction1, functionName, numPartitions);
+ testBasicProperties(partitionFunction2, functionName, numPartitions, functionConfig);
+ testBasicProperties(partitionFunction3, functionName, numPartitions, functionConfig);
+ testBasicProperties(partitionFunction4, functionName, numPartitions, functionConfig);
+ testBasicProperties(partitionFunction5, functionName, numPartitions, functionConfig);
+ testBasicProperties(partitionFunction6, functionName, numPartitions, functionConfig);
+ testBasicProperties(partitionFunction7, functionName, numPartitions, functionConfig);
+
+ for (int j = 0; j < NUM_ROUNDS; j++) {
+ int value = j == 0 ? Integer.MIN_VALUE : random.nextInt();
+
+ // check for the partition function with functionConfig as null.
+ testToStringAndPartitionNumber(partitionFunction1, value, numPartitions);
+
+ // check for the partition function with non-null functionConfig but without seed value.
+ testToStringAndPartitionNumber(partitionFunction2, value, numPartitions);
+
+ // check for the partition function with non-null functionConfig and with seed value.
+ testToStringAndPartitionNumber(partitionFunction3, value, numPartitions);
+
+ // check for the partition function with non-null functionConfig and with seed value and variant.
+ testToStringAndPartitionNumber(partitionFunction4, value, numPartitions);
+
+ // check for the partition function with non-null functionConfig and with explicitly provided default seed
+ // value and variant.
+ testToStringAndPartitionNumber(partitionFunction5, value, numPartitions);
+
+ // check for the partition function with non-null functionConfig and with empty seed value and default variant.
+ testToStringAndPartitionNumber(partitionFunction6, value, numPartitions);
+
+ // check for the partition function with non-null functionConfig and with empty seed value and empty variant.
+ testToStringAndPartitionNumber(partitionFunction7, value, numPartitions);
+ }
+ }
+ }
+
+ @Test
+ public void testMurmur3Equivalence() {
+
+ // 10 values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
+ // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 0 to those values and stored in
+ // expectedMurmurValuesFor32BitX64WithZeroSeedForByteArray.
+ int[] expectedMurmurValuesFor32BitX64WithZeroSeedForByteArray = new int[]{
+ -1569442405, -921191038, 16439113, -881572510, 2111401876, 655879980, 1409856380, -1348364123, -1770645361,
+ 1277101955
+ };
+
+ // 10 values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
+ // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 9001 to those values and
+ // stored in expectedMurmurValuesFor32BitX64WithNonZeroSeedForByteArray.
+ int[] expectedMurmurValuesFor32BitX64WithNonZeroSeedForByteArray = new int[]{
+ 698083240, 174075836, -938825597, 155806634, -831733828, 319389887, -939822329, -1785781936, -1796939240,
+ 757512622
+ };
+
+ // 10 values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
+ // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 0 to those values and stored in
+ // expectedMurmurValuesFor32BitX64WithZeroSeedForLongArray.
+ int[] expectedMurmurValuesFor32BitX64WithZeroSeedForLongArray = new int[]{
+ -621156783, -1341356662, 1615513844, 1608247599, -1339558745, -1782606270, 1204009437, 8939246, -42073819,
+ 1268621125
+ };
+
+ // 10 values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
+ // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 9001 to those values and
+ // stored in expectedMurmurValuesFor32BitX64WithNonZeroSeedForLongArray.
+ int[] expectedMurmurValuesFor32BitX64WithNonZeroSeedForLongArray = new int[]{
+ -159780599, 1266925141, -2039451704, 237323842, -1373894107, -1718192521, 314068498, 1377198162, 1239340429,
+ -1643307044
+ };
+
+ // 10 String values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
+ // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_64_String with seed = 0, applied right shift
+ // on 32 bits to those values and stored in expectedMurmurValuesFor32BitX64WithZeroSeedForString.
+ int[] expectedMurmurValuesFor32BitX64WithZeroSeedForString = new int[]{
+ -930531654, 1010637996, -1251084035, -1551293561, 1591443335, 181872103, 1308755538, -432310401, -701537488,
+ 674867586
+ };
+
+ // 10 String values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
+ // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_64_String with seed = 0, applied right shift
+ // on 32 bits those values and stored in expectedMurmurValuesFor32BitX64WithNonZeroSeedForString.
+ int[] expectedMurmurValuesFor32BitX64WithNonZeroSeedForString = new int[]{
+ 1558697417, 933581816, 1071120824, 1964512897, 1629803052, 2037246152, -1867319466, -1003065762, -275998120,
+ 1386652858
+ };
+
+ // 10 values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
+ // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x86_32 with seed = 0 to those values and stored in
+ // expectedMurmurValuesFor32BitX86WithZeroSeed.
+ int[] expectedMurmurValuesFor32BitX86WithZeroSeed = new int[]{
+ 1255034832, -395463542, 659973067, 1070436837, -1193041642, -1412829846, -483463488, -1385092001, 568671606,
+ -807299446
+ };
+
+ // 10 values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
+ // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x86_32 with seed = 9001 to those values and
+ // stored in expectedMurmurValuesFor32BitX86WithNonZeroSeed.
+ int[] expectedMurmurValuesFor32BitX86WithNonZeroSeed = new int[]{
+ -590969347, -315366997, 1642137565, -1732240651, -597560989, -1430018124, -448506674, 410998174, -1912106487,
+ -19253806
+ };
+
+ // Test for 32 bit murmur3 hash with x64_64 variant and seed = 0 for byte array.
+ testMurmur3HashEquivalenceForDifferentDataTypes(0, expectedMurmurValuesFor32BitX64WithZeroSeedForByteArray,
+ "byteArray", "x64_32");
+
+ // Test for 32 bit murmur3 hash with x64_64 variant and seed = 9001 for byte array.
+ testMurmur3HashEquivalenceForDifferentDataTypes(9001, expectedMurmurValuesFor32BitX64WithNonZeroSeedForByteArray,
+ "byteArray", "x64_32");
+
+ // Test for 32 bit murmur3 hash with x64_64 variant and seed = 0 for long array.
+ testMurmur3HashEquivalenceForDifferentDataTypes(0, expectedMurmurValuesFor32BitX64WithZeroSeedForLongArray,
+ "longArray", "x64_32");
+
+ // Test for 32 bit murmur3 hash with x64_64 variant and seed = 9001 for long array.
+ testMurmur3HashEquivalenceForDifferentDataTypes(9001, expectedMurmurValuesFor32BitX64WithNonZeroSeedForLongArray,
+ "longArray", "x64_32");
+
+ // Test for 64 bit murmur3 hash with x64_64 variant and seed = 0 for String.
+ testMurmur3HashEquivalenceForDifferentDataTypes(0, expectedMurmurValuesFor32BitX64WithZeroSeedForString, "String",
+ "x64_32");
+
+ // Test for 64 bit murmur3 hash with x64_64 variant and seed = 9001 for String.
+ testMurmur3HashEquivalenceForDifferentDataTypes(9001, expectedMurmurValuesFor32BitX64WithNonZeroSeedForString,
+ "String", "x64_32");
+
+ // Test for 32 bit murmur3 hash with x86_32 variant and seed = 0 for byte array.
+ testMurmur3HashEquivalenceForDifferentDataTypes(0, expectedMurmurValuesFor32BitX86WithZeroSeed, "byteArray",
+ "x86_32");
+
+ // Test for 32 bit murmur3 hash with x86_32 variant and seed = 9001 for byte array.
+ testMurmur3HashEquivalenceForDifferentDataTypes(9001, expectedMurmurValuesFor32BitX86WithNonZeroSeed, "byteArray",
+ "x86_32");
+ }
+
/**
* Unit test for {@link MurmurPartitionFunction}.
* <ul>
@@ -275,6 +504,95 @@ public class PartitionFunctionTest {
testPartitionFunctionEquivalence(murmurPartitionFunction, expectedPartitions);
}
+ @Test
+ public void testMurmur3PartitionFunctionEquivalence() {
+
+ // 10 String values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
+ // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 0 to those values and stored in
+ // expectedMurmurValuesFor32BitX64WithZeroSeed.
+ int[] expectedPartitions32BitsX64WithZeroSeedForByteArrayAndString = new int[]{
+ 4, 1, 3, 2, 0, 3, 3, 2, 0, 1
+ };
+
+ // 10 String values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
+ // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 9001 to those values and
+ // stored in expectedMurmurValuesFor32BitX64WithZeroSeed.
+ int[] expectedPartitions32BitsX64WithNonZeroSeedForByteArrayAndString = new int[]{
+ 2, 1, 4, 2, 2, 2, 2, 1, 3, 3
+ };
+
+ // 10 long[] values of size 10, were randomly generated, using {@link Random::nextLong} with seed 100
+ // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 0 to those values and stored in
+ // expectedPartitions32BitsX64WithZeroSeedForLongArray.
+ int[] expectedPartitions32BitsX64WithZeroSeedForLongArray = new int[]{
+ 0, 1, 4, 4, 3, 3, 2, 1, 4, 0
+ };
+
+ // 10 long[] values of size 10, were randomly generated, using {@link Random::nextLong} with seed 100
+ // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 0 to those values and stored in
+ // expectedPartitions32BitsX64WithNonZeroSeedForLongArray.
+ int[] expectedPartitions32BitsX64WithNonZeroSeedForLongArray = new int[]{
+ 4, 1, 4, 2, 1, 2, 3, 2, 4, 4
+ };
+
+ // 10 String values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
+ // Applied com.google.common.hash.hashing::murmur3_32_fixed with seed = 0 to those values and stored in
+ // expectedMurmurValuesFor32BitX64WithZeroSeed.
+ int[] expectedPartitions32BitsX86WithZeroSeed = new int[]{
+ 4, 3, 3, 2, 3, 4, 0, 3, 1, 4
+ };
+
+ // 10 String values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
+ // Applied com.google.common.hash.hashing::murmur3_32_fixed with seed = 9001 to those values and stored in
+ // expectedMurmurValuesFor32BitX64WithZeroSeed.
+ int[] expectedPartitions32BitsX86WithNonZeroSeed = new int[]{
+ 2, 1, 3, 2, 2, 1, 1, 4, 4, 2
+ };
+
+ // initialized {@link Murmur3PartitionFunction} with 5 partitions and variant as "x64_32".
+ int numPartitions = 5;
+ Map<String, String> functionConfig = new HashMap<>();
+ functionConfig.put("variant", "x64_32");
+
+ // x64_32 variant with seed = 0.
+ Murmur3PartitionFunction murmur3PartitionFunction1 = new Murmur3PartitionFunction(numPartitions, functionConfig);
+
+ // Put seed value in "seed" field in the function config.
+ functionConfig.put("seed", Integer.toString(9001));
+
+ // x64_32 variant with seed = 9001.
+ Murmur3PartitionFunction murmur3PartitionFunction2 = new Murmur3PartitionFunction(numPartitions, functionConfig);
+
+ // x86_32 variant with seed = 0.
+ Murmur3PartitionFunction murmur3PartitionFunction3 = new Murmur3PartitionFunction(numPartitions, null);
+
+ // Remove the variant field.
+ functionConfig.remove("variant");
+
+ // x86_32 bit variant with seed = 9001.
+ Murmur3PartitionFunction murmur3PartitionFunction4 = new Murmur3PartitionFunction(numPartitions, functionConfig);
+
+ // Generate the same 10 String values. Test if the calculated values are equal for both String and byte[] (they
+ // should be equal when String is converted to byte[]) and if the values are equal to the expected values for the
+ // x64_32 variant with seed = 0 and x64_32 variant with seed = 9001.
+ testPartitionFunctionEquivalenceWithStringAndByteArray(murmur3PartitionFunction1,
+ expectedPartitions32BitsX64WithZeroSeedForByteArrayAndString);
+ testPartitionFunctionEquivalenceWithStringAndByteArray(murmur3PartitionFunction2,
+ expectedPartitions32BitsX64WithNonZeroSeedForByteArrayAndString);
+
+ // Generate the same 10 long[] values. Test if the calculated values are equal to the expected values for the x64_32
+ // variant with seed = 0 and x64_32 variant with seed = 9001.
+ testPartitionFunctionEquivalenceWithLongArray(murmur3PartitionFunction1,
+ expectedPartitions32BitsX64WithZeroSeedForLongArray);
+ testPartitionFunctionEquivalenceWithLongArray(murmur3PartitionFunction2,
+ expectedPartitions32BitsX64WithNonZeroSeedForLongArray);
+
+ // Generate the same 10 String values. Test if the calculated values are equal to the expected values for the x86_32
+ // variant with seed = 0 and x86_32 variant with seed = 9001.
+ testPartitionFunctionEquivalence(murmur3PartitionFunction3, expectedPartitions32BitsX86WithZeroSeed);
+ testPartitionFunctionEquivalence(murmur3PartitionFunction4, expectedPartitions32BitsX86WithNonZeroSeed);
+ }
+
/**
* Tests the equivalence of kafka.producer.ByteArrayPartitioner::partition and {@link ByteArrayPartitionFunction
* ::getPartition}
@@ -309,4 +627,114 @@ public class PartitionFunctionTest {
assertEquals(actualPartition, expectedPartition);
}
}
+
+ private void testPartitionFunctionEquivalenceWithStringAndByteArray(PartitionFunction partitionFunction,
+ int[] expectedPartitions) {
+ long seed = 100;
+ Random random = new Random(seed);
+
+ // Generate 10 random String values of size 7, using {@link Random::nextBytes} with seed 100
+ // Apply given partition function
+ // compare with expectedPartitions
+ byte[] array = new byte[7];
+ for (int expectedPartitionNumber : expectedPartitions) {
+ random.nextBytes(array);
+ String nextString = new String(array, UTF_8);
+ int actualPartitionNumberFromString = partitionFunction.getPartition(nextString);
+ int actualPartitionNumberFromByteArray = partitionFunction.getPartition(nextString.getBytes(UTF_8));
+ assertEquals(actualPartitionNumberFromString, actualPartitionNumberFromByteArray);
+ assertEquals(actualPartitionNumberFromString, expectedPartitionNumber);
+ }
+ }
+
+ private void testPartitionFunctionEquivalenceWithLongArray(PartitionFunction partitionFunction,
+ int[] expectedPartitions) {
+ int seed = 100;
+ Random random = new Random(seed);
+
+ // Create a list of 10 long[] values using ArrayList, each of size 10.
+ List<long[]> longList = new ArrayList<>();
+
+ for (int i = 0; i < 10; i++) {
+ long[] longArray = new long[10];
+ for (int j = 0; j < 10; j++) {
+ longArray[j] = random.nextLong();
+ }
+ longList.add(longArray);
+ }
+
+ // Apply the partition function and compare with expected values.
+ for (int i = 0; i < 10; i++) {
+ int actualPartitionNumberFromLongArray = partitionFunction.getPartition(longList.get(i));
+ assertEquals(actualPartitionNumberFromLongArray, expectedPartitions[i]);
+ }
+ }
+
+ private void testMurmur3HashEquivalenceForDifferentDataTypes(int hashSeed, int[] expectedHashValues, String dataType,
+ String variant) {
+ long seed = 100;
+ Random random;
+ int numPartitions = 5;
+ Murmur3PartitionFunction murmur3PartitionFunction = new Murmur3PartitionFunction(numPartitions, null);
+
+ switch (dataType.toLowerCase()) {
+ case "string":
+ // Generate 10 random String values of size 7, using {@link Random::nextBytes} with seed 100
+ // Apply given partition function
+ // compare with expectedPartitions
+ random = new Random(seed);
+ byte[] array1 = new byte[7];
+ for (int expectedHashValue : expectedHashValues) {
+ random.nextBytes(array1);
+ String nextString = new String(array1, UTF_8);
+ int actualHashValueFromString = murmur3PartitionFunction.murmur3Hash32BitsX64(nextString, hashSeed);
+ assertEquals(actualHashValueFromString, expectedHashValue);
+ }
+ break;
+ case "bytearray":
+ // Generate 10 random String values of size 7, using {@link Random::nextBytes} with seed 100
+ // Apply given partition function
+ // compare with expectedPartitions
+ random = new Random(seed);
+ int actualHashValueFromByteArray;
+ byte[] array2 = new byte[7];
+ for (int expectedHashValue : expectedHashValues) {
+ random.nextBytes(array2);
+ if (variant.equals("x64_32")) {
+ actualHashValueFromByteArray = murmur3PartitionFunction.murmur3Hash32BitsX64(array2, hashSeed);
+ } else {
+ actualHashValueFromByteArray = murmur3PartitionFunction.murmur3Hash32BitsX86(array2, hashSeed);
+ }
+ assertEquals(actualHashValueFromByteArray, expectedHashValue);
+ }
+ break;
+ case "longarray":
+ random = new Random(seed);
+ // Create a list of 10 long[] values using ArrayList, each of size 10.
+ List<long[]> longList = new ArrayList<>();
+
+ for (int i = 0; i < 10; i++) {
+ long[] longArray = new long[10];
+ for (int j = 0; j < 10; j++) {
+ longArray[j] = random.nextLong();
+ }
+ longList.add(longArray);
+ }
+
+ // Apply the partition function and compare with expected values.
+ for (int i = 0; i < 10; i++) {
+ int actualHashValueFromLongArray = murmur3PartitionFunction.murmur3Hash32BitsX64(longList.get(i), hashSeed);
+ assertEquals(actualHashValueFromLongArray, expectedHashValues[i]);
+ }
+ break;
+ default:
+ }
+ }
+ private void testToStringAndPartitionNumber(PartitionFunction partitionFunction, int testValueForGetPartition,
+ int numPartitions) {
+ int partition1 = partitionFunction.getPartition(testValueForGetPartition);
+ int partition2 = partitionFunction.getPartition(Integer.toString(testValueForGetPartition));
+ assertEquals(partition1, partition2);
+ assertTrue(partition1 >= 0 && partition1 < numPartitions);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org