You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/12/05 22:59:35 UTC

(pinot) branch master updated: Add support for murmur3 as a partition function (#12049)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 49d0ff01a4 Add support for murmur3 as a partition function (#12049)
49d0ff01a4 is described below

commit 49d0ff01a4bce5a9fee3b15d40b0eb803fc11b64
Author: aishikbh <ai...@startree.ai>
AuthorDate: Wed Dec 6 04:29:30 2023 +0530

    Add support for murmur3 as a partition function (#12049)
---
 .../spi/partition/Murmur3PartitionFunction.java    | 453 +++++++++++++++++++++
 .../spi/partition/PartitionFunctionFactory.java    |   5 +-
 .../spi/partition/PartitionFunctionTest.java       | 428 +++++++++++++++++++
 3 files changed, 885 insertions(+), 1 deletion(-)

diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java
new file mode 100644
index 0000000000..d66eeca1ae
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/Murmur3PartitionFunction.java
@@ -0,0 +1,453 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.partition;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.hash.Hashing;
+import java.util.Map;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+
+/**
+ * Implementation of {@link PartitionFunction} which partitions based on 32 bit murmur3 hash
+ */
+public class Murmur3PartitionFunction implements PartitionFunction {
+  public static final byte INVALID_CHAR = (byte) '?';
+  private static final String NAME = "Murmur3";
+  private static final String SEED_KEY = "seed";
+  private static final String MURMUR3_VARIANT = "variant";
+  private final int _numPartitions;
+  private final int _hashSeed;
+  private final String _variant;
+
+  /**
+   * Constructor for the class.
+   * @param numPartitions Number of partitions.
+   * @param functionConfig to extract configurations for the partition function.
+   */
+  public Murmur3PartitionFunction(int numPartitions, Map<String, String> functionConfig) {
+    Preconditions.checkArgument(numPartitions > 0, "Number of partitions must be > 0");
+    Preconditions.checkArgument(
+        functionConfig == null || functionConfig.get(MURMUR3_VARIANT) == null || functionConfig.get(MURMUR3_VARIANT)
+            .isEmpty() || functionConfig.get(MURMUR3_VARIANT).equals("x86_32") || functionConfig.get(MURMUR3_VARIANT)
+            .equals("x64_32"), "Murmur3 variant must be either x86_32 or x64_32");
+    _numPartitions = numPartitions;
+
+    // default value of the hash seed is 0.
+    _hashSeed =
+        (functionConfig == null || functionConfig.get(SEED_KEY) == null || functionConfig.get(SEED_KEY).isEmpty()) ? 0
+            : Integer.parseInt(functionConfig.get(SEED_KEY));
+
+    // default value of the murmur3 variant is x86_32.
+    _variant =
+        (functionConfig == null || functionConfig.get(MURMUR3_VARIANT) == null || functionConfig.get(MURMUR3_VARIANT)
+            .isEmpty()) ? "x86_32" : functionConfig.get(MURMUR3_VARIANT);
+  }
+
+  @Override
+  public int getPartition(Object value) {
+    if (_variant.equals("x86_32")) {
+      return (murmur3Hash32BitsX86(value.toString().getBytes(UTF_8), _hashSeed) & Integer.MAX_VALUE) % _numPartitions;
+    }
+    return (murmur3Hash32BitsX64(value, _hashSeed) & Integer.MAX_VALUE) % _numPartitions;
+  }
+
+  @Override
+  public String getName() {
+    return NAME;
+  }
+
+  @Override
+  public int getNumPartitions() {
+    return _numPartitions;
+  }
+
+  // Keep it for backward-compatibility, use getName() instead
+  @Override
+  public String toString() {
+    return NAME;
+  }
+
+  @VisibleForTesting
+  int murmur3Hash32BitsX86(byte[] data, int hashSeed) {
+    return Hashing.murmur3_32_fixed(hashSeed).hashBytes(data).asInt();
+  }
+
+  /**
+   * Taken from <a href=
+   * "https://github.com/infinispan/infinispan/blob/main/commons/all/src/main/java/org/infinispan/commons/hash
+   * /MurmurHash3.java"
+   * >Infinispan code base</a>.
+   *
+   * MurmurHash3 implementation in Java, based on Austin Appleby's <a href=
+   * "https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp"
+   * >original in C</a>
+   *
+   * This is an implementation of MurmurHash3 to generate 32 bit hash for x64 architecture (not part of the original
+   * Murmur3 implementations) used by Infinispan and Debezium, Removed the parts that we don't need and formatted
+   * the code to Apache Pinot's Checkstyle.
+   *
+   * @author Patrick McFarland
+   * @see <a href="http://sites.google.com/site/murmurhash/">MurmurHash website</a>
+   * @see <a href="http://en.wikipedia.org/wiki/MurmurHash">MurmurHash entry on Wikipedia</a>
+   */
+
+  private long getblock(byte[] key, int i) {
+    return ((key[i + 0] & 0x00000000000000FFL)) | ((key[i + 1] & 0x00000000000000FFL) << 8) | (
+        (key[i + 2] & 0x00000000000000FFL) << 16) | ((key[i + 3] & 0x00000000000000FFL) << 24) | (
+        (key[i + 4] & 0x00000000000000FFL) << 32) | ((key[i + 5] & 0x00000000000000FFL) << 40) | (
+        (key[i + 6] & 0x00000000000000FFL) << 48) | ((key[i + 7] & 0x00000000000000FFL) << 56);
+  }
+
+  private void bmix(State state) {
+    state._k1 *= state._c1;
+    state._k1 = (state._k1 << 23) | (state._k1 >>> 64 - 23);
+    state._k1 *= state._c2;
+    state._h1 ^= state._k1;
+    state._h1 += state._h2;
+
+    state._h2 = (state._h2 << 41) | (state._h2 >>> 64 - 41);
+
+    state._k2 *= state._c2;
+    state._k2 = (state._k2 << 23) | (state._k2 >>> 64 - 23);
+    state._k2 *= state._c1;
+    state._h2 ^= state._k2;
+    state._h2 += state._h1;
+
+    state._h1 = state._h1 * 3 + 0x52dce729;
+    state._h2 = state._h2 * 3 + 0x38495ab5;
+
+    state._c1 = state._c1 * 5 + 0x7b7d159c;
+    state._c2 = state._c2 * 5 + 0x6bce6396;
+  }
+
+  private long fmix(long k) {
+    k ^= k >>> 33;
+    k *= 0xff51afd7ed558ccdL;
+    k ^= k >>> 33;
+    k *= 0xc4ceb9fe1a85ec53L;
+    k ^= k >>> 33;
+
+    return k;
+  }
+
+  /**
+   * Hash a value using the x64 64 bit variant of MurmurHash3
+   *
+   * @param key value to hash
+   * @param seed random value
+   * @return 64 bit hashed key
+   */
+  private long murmur3Hash64BitsX64(final byte[] key, final int seed) {
+    State state = new State();
+
+    state._h1 = 0x9368e53c2f6af274L ^ seed;
+    state._h2 = 0x586dcd208f7cd3fdL ^ seed;
+
+    state._c1 = 0x87c37b91114253d5L;
+    state._c2 = 0x4cf5ad432745937fL;
+
+    for (int i = 0; i < key.length / 16; i++) {
+      state._k1 = getblock(key, i * 2 * 8);
+      state._k2 = getblock(key, (i * 2 + 1) * 8);
+
+      bmix(state);
+    }
+
+    state._k1 = 0;
+    state._k2 = 0;
+
+    int tail = (key.length >>> 4) << 4;
+
+    // CHECKSTYLE:OFF
+    switch (key.length & 15) {
+      case 15:
+        state._k2 ^= (long) key[tail + 14] << 48;
+      case 14:
+        state._k2 ^= (long) key[tail + 13] << 40;
+      case 13:
+        state._k2 ^= (long) key[tail + 12] << 32;
+      case 12:
+        state._k2 ^= (long) key[tail + 11] << 24;
+      case 11:
+        state._k2 ^= (long) key[tail + 10] << 16;
+      case 10:
+        state._k2 ^= (long) key[tail + 9] << 8;
+      case 9:
+        state._k2 ^= key[tail + 8];
+      case 8:
+        state._k1 ^= (long) key[tail + 7] << 56;
+      case 7:
+        state._k1 ^= (long) key[tail + 6] << 48;
+      case 6:
+        state._k1 ^= (long) key[tail + 5] << 40;
+      case 5:
+        state._k1 ^= (long) key[tail + 4] << 32;
+      case 4:
+        state._k1 ^= (long) key[tail + 3] << 24;
+      case 3:
+        state._k1 ^= (long) key[tail + 2] << 16;
+      case 2:
+        state._k1 ^= (long) key[tail + 1] << 8;
+      case 1:
+        state._k1 ^= key[tail + 0];
+        bmix(state);
+    }
+
+    // CHECKSTYLE:ON
+    state._h2 ^= key.length;
+
+    state._h1 += state._h2;
+    state._h2 += state._h1;
+
+    state._h1 = fmix(state._h1);
+    state._h2 = fmix(state._h2);
+
+    state._h1 += state._h2;
+    state._h2 += state._h1;
+
+    return state._h1;
+  }
+
+  /**
+   * Hash a value using the x64 32 bit variant of MurmurHash3
+   *
+   * @param key value to hash
+   * @param seed random value
+   * @return 32 bit hashed key
+   */
+  private int murmur3Hash32BitsX64(final byte[] key, final int seed) {
+    return (int) (murmur3Hash64BitsX64(key, seed) >>> 32);
+  }
+
+  private long murmur3Hash64BitsX64(final long[] key, final int seed) {
+    // Exactly the same as MurmurHash3_x64_128, except it only returns state.h1
+    State state = new State();
+
+    state._h1 = 0x9368e53c2f6af274L ^ seed;
+    state._h2 = 0x586dcd208f7cd3fdL ^ seed;
+
+    state._c1 = 0x87c37b91114253d5L;
+    state._c2 = 0x4cf5ad432745937fL;
+
+    for (int i = 0; i < key.length / 2; i++) {
+      state._k1 = key[i * 2];
+      state._k2 = key[i * 2 + 1];
+
+      bmix(state);
+    }
+
+    long tail = key[key.length - 1];
+
+    if (key.length % 2 != 0) {
+      state._k1 ^= tail;
+      bmix(state);
+    }
+
+    state._h2 ^= key.length * 8;
+
+    state._h1 += state._h2;
+    state._h2 += state._h1;
+
+    state._h1 = fmix(state._h1);
+    state._h2 = fmix(state._h2);
+
+    state._h1 += state._h2;
+    state._h2 += state._h1;
+
+    return state._h1;
+  }
+
+  /**
+   * Hash a value using the x64 32 bit variant of MurmurHash3
+   *
+   * @param key value to hash
+   * @param seed random value
+   * @return 32 bit hashed key
+   */
+  private int murmur3Hash32BitsX64(final long[] key, final int seed) {
+    return (int) (murmur3Hash64BitsX64(key, seed) >>> 32);
+  }
+
+  @VisibleForTesting
+  int murmur3Hash32BitsX64(Object o, int seed) {
+    if (o instanceof byte[]) {
+      return murmur3Hash32BitsX64((byte[]) o, seed);
+    } else if (o instanceof long[]) {
+      return murmur3Hash32BitsX64((long[]) o, seed);
+    } else if (o instanceof String) {
+      return murmur3Hash32BitsX64((String) o, seed);
+    } else {
+      // Differing from the source implementation here. The default case in the source implementation is to apply the
+      // hash on the hashcode of the object. The hashcode of an object is not guaranteed to be consistent across JVMs
+      // (except for String values), so we cannot guarantee the same value as the data source. Since we cannot
+      // guarantee similar values, we will instead apply the hash on the string representation of the object, which
+      // aligns with the rest of our code base.
+      return murmur3Hash32BitsX64(o.toString(), seed);
+    }
+  }
+
+  private int murmur3Hash32BitsX64(String s, int seed) {
+    return (int) (murmur3Hash32BitsX64String(s, seed) >> 32);
+  }
+
+  private long murmur3Hash32BitsX64String(String s, long seed) {
+    // Exactly the same as MurmurHash3_x64_64, except it works directly on a String's chars
+    State state = new State();
+
+    state._h1 = 0x9368e53c2f6af274L ^ seed;
+    state._h2 = 0x586dcd208f7cd3fdL ^ seed;
+
+    state._c1 = 0x87c37b91114253d5L;
+    state._c2 = 0x4cf5ad432745937fL;
+
+    int byteLen = 0;
+    int stringLen = s.length();
+
+    // CHECKSTYLE:OFF
+    for (int i = 0; i < stringLen; i++) {
+      char c1 = s.charAt(i);
+      int cp;
+      if (!Character.isSurrogate(c1)) {
+        cp = c1;
+      } else if (Character.isHighSurrogate(c1)) {
+        if (i + 1 < stringLen) {
+          char c2 = s.charAt(i + 1);
+          if (Character.isLowSurrogate(c2)) {
+            i++;
+            cp = Character.toCodePoint(c1, c2);
+          } else {
+            cp = INVALID_CHAR;
+          }
+        } else {
+          cp = INVALID_CHAR;
+        }
+      } else {
+        cp = INVALID_CHAR;
+      }
+
+      if (cp <= 0x7f) {
+        addByte(state, (byte) cp, byteLen++);
+      } else if (cp <= 0x07ff) {
+        byte b1 = (byte) (0xc0 | (0x1f & (cp >> 6)));
+        byte b2 = (byte) (0x80 | (0x3f & cp));
+        addByte(state, b1, byteLen++);
+        addByte(state, b2, byteLen++);
+      } else if (cp <= 0xffff) {
+        byte b1 = (byte) (0xe0 | (0x0f & (cp >> 12)));
+        byte b2 = (byte) (0x80 | (0x3f & (cp >> 6)));
+        byte b3 = (byte) (0x80 | (0x3f & cp));
+        addByte(state, b1, byteLen++);
+        addByte(state, b2, byteLen++);
+        addByte(state, b3, byteLen++);
+      } else {
+        byte b1 = (byte) (0xf0 | (0x07 & (cp >> 18)));
+        byte b2 = (byte) (0x80 | (0x3f & (cp >> 12)));
+        byte b3 = (byte) (0x80 | (0x3f & (cp >> 6)));
+        byte b4 = (byte) (0x80 | (0x3f & cp));
+        addByte(state, b1, byteLen++);
+        addByte(state, b2, byteLen++);
+        addByte(state, b3, byteLen++);
+        addByte(state, b4, byteLen++);
+      }
+    }
+
+    // CHECKSTYLE:ON
+    long savedK1 = state._k1;
+    long savedK2 = state._k2;
+    state._k1 = 0;
+    state._k2 = 0;
+
+    // CHECKSTYLE:OFF
+    switch (byteLen & 15) {
+      case 15:
+        state._k2 ^= (long) ((byte) (savedK2 >> 48)) << 48;
+      case 14:
+        state._k2 ^= (long) ((byte) (savedK2 >> 40)) << 40;
+      case 13:
+        state._k2 ^= (long) ((byte) (savedK2 >> 32)) << 32;
+      case 12:
+        state._k2 ^= (long) ((byte) (savedK2 >> 24)) << 24;
+      case 11:
+        state._k2 ^= (long) ((byte) (savedK2 >> 16)) << 16;
+      case 10:
+        state._k2 ^= (long) ((byte) (savedK2 >> 8)) << 8;
+      case 9:
+        state._k2 ^= ((byte) savedK2);
+      case 8:
+        state._k1 ^= (long) ((byte) (savedK1 >> 56)) << 56;
+      case 7:
+        state._k1 ^= (long) ((byte) (savedK1 >> 48)) << 48;
+      case 6:
+        state._k1 ^= (long) ((byte) (savedK1 >> 40)) << 40;
+      case 5:
+        state._k1 ^= (long) ((byte) (savedK1 >> 32)) << 32;
+      case 4:
+        state._k1 ^= (long) ((byte) (savedK1 >> 24)) << 24;
+      case 3:
+        state._k1 ^= (long) ((byte) (savedK1 >> 16)) << 16;
+      case 2:
+        state._k1 ^= (long) ((byte) (savedK1 >> 8)) << 8;
+      case 1:
+        state._k1 ^= ((byte) savedK1);
+        bmix(state);
+    }
+    // CHECKSTYLE:ON
+    state._h2 ^= byteLen;
+
+    state._h1 += state._h2;
+    state._h2 += state._h1;
+
+    state._h1 = fmix(state._h1);
+    state._h2 = fmix(state._h2);
+
+    state._h1 += state._h2;
+    state._h2 += state._h1;
+
+    return state._h1;
+  }
+
+  private void addByte(State state, byte b, int len) {
+    int shift = (len & 0x7) * 8;
+    long bb = (b & 0xffL) << shift;
+    if ((len & 0x8) == 0) {
+      state._k1 |= bb;
+    } else {
+      state._k2 |= bb;
+      if ((len & 0xf) == 0xf) {
+        bmix(state);
+        state._k1 = 0;
+        state._k2 = 0;
+      }
+    }
+  }
+
+  static class State {
+    long _h1;
+    long _h2;
+
+    long _k1;
+    long _k2;
+
+    long _c1;
+    long _c2;
+  }
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java
index 77bfc4c08d..13a60b424b 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java
@@ -28,7 +28,7 @@ import java.util.Map;
 public class PartitionFunctionFactory {
   // Enum for various partition functions to be added.
   public enum PartitionFunctionType {
-    Modulo, Murmur, ByteArray, HashCode, BoundedColumnValue;
+    Modulo, Murmur, Murmur3, ByteArray, HashCode, BoundedColumnValue;
     // Add more functions here.
 
     private static final Map<String, PartitionFunctionType> VALUE_MAP = new HashMap<>();
@@ -77,6 +77,9 @@ public class PartitionFunctionFactory {
       case Murmur:
         return new MurmurPartitionFunction(numPartitions);
 
+      case Murmur3:
+        return new Murmur3PartitionFunction(numPartitions, functionConfig);
+
       case ByteArray:
         return new ByteArrayPartitionFunction(numPartitions);
 
diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java
index 924cc87ffc..dec22a663d 100644
--- a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java
+++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/PartitionFunctionTest.java
@@ -19,7 +19,9 @@
 package org.apache.pinot.segment.spi.partition;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -115,6 +117,233 @@ public class PartitionFunctionTest {
     }
   }
 
+  /**
+   * Unit test for {@link Murmur3PartitionFunction}.
+   * <ul>
+   *   <li> Tests that partition values are in expected range. </li>
+   *   <li> Tests that toString returns expected string. </li>
+   *   <li> Tests the default behaviors when functionConfig is not provided or only one of the optional parameters of
+   *   functionConfig is provided.</li>
+   * </ul>
+   */
+  @Test
+  public void testMurmur3Partitioner() {
+    long seed = System.currentTimeMillis();
+    Random random = new Random(seed);
+
+    for (int i = 0; i < NUM_ROUNDS; i++) {
+      int numPartitions = random.nextInt(MAX_NUM_PARTITIONS) + 1;
+
+      String functionName = "MurMUr3";
+      String valueTobeHashed = String.valueOf(random.nextInt());
+      Map<String, String> functionConfig = new HashMap<>();
+
+      // Create partition function with function config as null.
+      PartitionFunction partitionFunction1 =
+          PartitionFunctionFactory.getPartitionFunction(functionName, numPartitions, null);
+
+      // Check getName and toString equivalence.
+      assertEquals(partitionFunction1.getName(), partitionFunction1.toString());
+
+      // Get partition number with random value.
+      int partitionNumWithNullConfig = partitionFunction1.getPartition(valueTobeHashed);
+
+      // Create partition function with function config present but no seed value present.
+      PartitionFunction partitionFunction2 =
+          PartitionFunctionFactory.getPartitionFunction(functionName, numPartitions, functionConfig);
+
+      // Get partition number with random value.
+      int partitionNumWithNoSeedValue = partitionFunction2.getPartition(valueTobeHashed);
+
+      // The partition number with null function config and function config with empty seed value should be equal.
+      assertEquals(partitionNumWithNullConfig, partitionNumWithNoSeedValue);
+
+      // Put random seed value in "seed" field in the function config.
+      functionConfig.put("seed", Integer.toString(random.nextInt()));
+
+      // Create partition function with function config present but random seed value present in function config.
+      PartitionFunction partitionFunction3 =
+          PartitionFunctionFactory.getPartitionFunction(functionName, numPartitions, functionConfig);
+
+      // Create partition function with function config present with random seed value
+      // and with variant provided as "x64_32" in function config.
+      functionConfig.put("variant", "x64_32");
+      PartitionFunction partitionFunction4 =
+          PartitionFunctionFactory.getPartitionFunction(functionName, numPartitions, functionConfig);
+
+      // Put variant value as "x86_32" in function config.
+      functionConfig.put("variant", "x86_32");
+
+      // Put seed value as 0 in function config.
+      functionConfig.put("seed", "0");
+
+      // Create partition function with function config present with variant provided as "x86_32" in function config.
+      PartitionFunction partitionFunction5 =
+          PartitionFunctionFactory.getPartitionFunction(functionName, numPartitions, functionConfig);
+
+      // Partition number should be equal as partitionNumWithNullConfig and partitionNumWithNoSeedValue as this is
+      // default behavior.
+      assertEquals(partitionFunction5.getPartition(valueTobeHashed), partitionNumWithNullConfig);
+
+      // Replace seed value as empty string in function config.
+      functionConfig.put("seed", "");
+
+      // Create partition function with function config present with variant provided as "x86_32" and empty seed
+      // value in functionConfig.
+      PartitionFunction partitionFunction6 =
+          PartitionFunctionFactory.getPartitionFunction(functionName, numPartitions, functionConfig);
+
+      // Partition number should be equal as partitionNumWithNullConfig and partitionNumWithNoSeedValue as this is
+      // default behavior.
+      assertEquals(partitionFunction6.getPartition(valueTobeHashed), partitionNumWithNullConfig);
+
+      // Replace variant value as empty string in function config.
+      functionConfig.put("variant", "");
+
+      // Create partition function with function config present with empty variant and empty seed.
+      PartitionFunction partitionFunction7 =
+          PartitionFunctionFactory.getPartitionFunction(functionName, numPartitions, functionConfig);
+
+      // Partition number should be equal as partitionNumWithNullConfig and partitionNumWithNoSeedValue as this is
+      // default behavior.
+      assertEquals(partitionFunction7.getPartition(valueTobeHashed), partitionNumWithNullConfig);
+
+      testBasicProperties(partitionFunction1, functionName, numPartitions);
+      testBasicProperties(partitionFunction2, functionName, numPartitions, functionConfig);
+      testBasicProperties(partitionFunction3, functionName, numPartitions, functionConfig);
+      testBasicProperties(partitionFunction4, functionName, numPartitions, functionConfig);
+      testBasicProperties(partitionFunction5, functionName, numPartitions, functionConfig);
+      testBasicProperties(partitionFunction6, functionName, numPartitions, functionConfig);
+      testBasicProperties(partitionFunction7, functionName, numPartitions, functionConfig);
+
+      for (int j = 0; j < NUM_ROUNDS; j++) {
+        int value = j == 0 ? Integer.MIN_VALUE : random.nextInt();
+
+        // check for the partition function with functionConfig as null.
+        testToStringAndPartitionNumber(partitionFunction1, value, numPartitions);
+
+        // check for the partition function with non-null functionConfig but without seed value.
+        testToStringAndPartitionNumber(partitionFunction2, value, numPartitions);
+
+        // check for the partition function with non-null functionConfig and with seed value.
+        testToStringAndPartitionNumber(partitionFunction3, value, numPartitions);
+
+        // check for the partition function with non-null functionConfig and with seed value and variant.
+        testToStringAndPartitionNumber(partitionFunction4, value, numPartitions);
+
+        // check for the partition function with non-null functionConfig and with explicitly provided default seed
+        // value and variant.
+        testToStringAndPartitionNumber(partitionFunction5, value, numPartitions);
+
+        // check for the partition function with non-null functionConfig and with empty seed value and default variant.
+        testToStringAndPartitionNumber(partitionFunction6, value, numPartitions);
+
+        // check for the partition function with non-null functionConfig and with empty seed value and empty variant.
+        testToStringAndPartitionNumber(partitionFunction7, value, numPartitions);
+      }
+    }
+  }
+
+  @Test
+  public void testMurmur3Equivalence() {
+
+    // 10 values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
+    // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 0 to those values and stored in
+    // expectedMurmurValuesFor32BitX64WithZeroSeedForByteArray.
+    int[] expectedMurmurValuesFor32BitX64WithZeroSeedForByteArray = new int[]{
+        -1569442405, -921191038, 16439113, -881572510, 2111401876, 655879980, 1409856380, -1348364123, -1770645361,
+        1277101955
+    };
+
+    // 10 values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
+    // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 9001 to those values and
+    // stored in expectedMurmurValuesFor32BitX64WithNonZeroSeedForByteArray.
+    int[] expectedMurmurValuesFor32BitX64WithNonZeroSeedForByteArray = new int[]{
+        698083240, 174075836, -938825597, 155806634, -831733828, 319389887, -939822329, -1785781936, -1796939240,
+        757512622
+    };
+
+    // 10 values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
+    // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 0 to those values and stored in
+    // expectedMurmurValuesFor32BitX64WithZeroSeedForLongArray.
+    int[] expectedMurmurValuesFor32BitX64WithZeroSeedForLongArray = new int[]{
+        -621156783, -1341356662, 1615513844, 1608247599, -1339558745, -1782606270, 1204009437, 8939246, -42073819,
+        1268621125
+    };
+
+    // 10 values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
+    // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 9001 to those values and
+    // stored in expectedMurmurValuesFor32BitX64WithNonZeroSeedForLongArray.
+    int[] expectedMurmurValuesFor32BitX64WithNonZeroSeedForLongArray = new int[]{
+        -159780599, 1266925141, -2039451704, 237323842, -1373894107, -1718192521, 314068498, 1377198162, 1239340429,
+        -1643307044
+    };
+
+    // 10 String values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
+    // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_64_String with seed = 0, applied right shift
+    // on 32 bits to those values and stored in expectedMurmurValuesFor32BitX64WithZeroSeedForString.
+    int[] expectedMurmurValuesFor32BitX64WithZeroSeedForString = new int[]{
+        -930531654, 1010637996, -1251084035, -1551293561, 1591443335, 181872103, 1308755538, -432310401, -701537488,
+        674867586
+    };
+
+    // 10 String values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
+    // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_64_String with seed = 0, applied right shift
+    // on 32 bits those values and stored in expectedMurmurValuesFor32BitX64WithNonZeroSeedForString.
+    int[] expectedMurmurValuesFor32BitX64WithNonZeroSeedForString = new int[]{
+        1558697417, 933581816, 1071120824, 1964512897, 1629803052, 2037246152, -1867319466, -1003065762, -275998120,
+        1386652858
+    };
+
+    // 10 values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
+    // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x86_32 with seed = 0 to those values and stored in
+    // expectedMurmurValuesFor32BitX86WithZeroSeed.
+    int[] expectedMurmurValuesFor32BitX86WithZeroSeed = new int[]{
+        1255034832, -395463542, 659973067, 1070436837, -1193041642, -1412829846, -483463488, -1385092001, 568671606,
+        -807299446
+    };
+
+    // 10 values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
+    // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x86_32 with seed = 9001 to those values and
+    // stored in expectedMurmurValuesFor32BitX86WithNonZeroSeed.
+    int[] expectedMurmurValuesFor32BitX86WithNonZeroSeed = new int[]{
+        -590969347, -315366997, 1642137565, -1732240651, -597560989, -1430018124, -448506674, 410998174, -1912106487,
+        -19253806
+    };
+
+    // Test for 32 bit murmur3 hash with x64_64 variant and seed = 0 for byte array.
+    testMurmur3HashEquivalenceForDifferentDataTypes(0, expectedMurmurValuesFor32BitX64WithZeroSeedForByteArray,
+        "byteArray", "x64_32");
+
+    // Test for 32 bit murmur3 hash with x64_64 variant and seed = 9001 for byte array.
+    testMurmur3HashEquivalenceForDifferentDataTypes(9001, expectedMurmurValuesFor32BitX64WithNonZeroSeedForByteArray,
+        "byteArray", "x64_32");
+
+    // Test for 32 bit murmur3 hash with x64_64 variant and seed = 0 for long array.
+    testMurmur3HashEquivalenceForDifferentDataTypes(0, expectedMurmurValuesFor32BitX64WithZeroSeedForLongArray,
+        "longArray", "x64_32");
+
+    // Test for 32 bit murmur3 hash with x64_64 variant and seed = 9001 for long array.
+    testMurmur3HashEquivalenceForDifferentDataTypes(9001, expectedMurmurValuesFor32BitX64WithNonZeroSeedForLongArray,
+        "longArray", "x64_32");
+
+    // Test for 64 bit murmur3 hash with x64_64 variant and seed = 0 for String.
+    testMurmur3HashEquivalenceForDifferentDataTypes(0, expectedMurmurValuesFor32BitX64WithZeroSeedForString, "String",
+        "x64_32");
+
+    // Test for 64 bit murmur3 hash with x64_64 variant and seed = 9001 for String.
+    testMurmur3HashEquivalenceForDifferentDataTypes(9001, expectedMurmurValuesFor32BitX64WithNonZeroSeedForString,
+        "String", "x64_32");
+
+    // Test for 32 bit murmur3 hash with x86_32 variant and seed = 0 for byte array.
+    testMurmur3HashEquivalenceForDifferentDataTypes(0, expectedMurmurValuesFor32BitX86WithZeroSeed, "byteArray",
+        "x86_32");
+
+    // Test for 32 bit murmur3 hash with x86_32 variant and seed = 9001 for byte array.
+    testMurmur3HashEquivalenceForDifferentDataTypes(9001, expectedMurmurValuesFor32BitX86WithNonZeroSeed, "byteArray",
+        "x86_32");
+  }
+
   /**
    * Unit test for {@link MurmurPartitionFunction}.
    * <ul>
@@ -275,6 +504,95 @@ public class PartitionFunctionTest {
     testPartitionFunctionEquivalence(murmurPartitionFunction, expectedPartitions);
   }
 
+  @Test
+  public void testMurmur3PartitionFunctionEquivalence() {
+
+    // 10 String values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
+    // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 0 to those values and stored in
+    // expectedMurmurValuesFor32BitX64WithZeroSeed.
+    int[] expectedPartitions32BitsX64WithZeroSeedForByteArrayAndString = new int[]{
+        4, 1, 3, 2, 0, 3, 3, 2, 0, 1
+    };
+
+    // 10 String values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
+    // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 9001 to those values and
+    // stored in expectedMurmurValuesFor32BitX64WithZeroSeed.
+    int[] expectedPartitions32BitsX64WithNonZeroSeedForByteArrayAndString = new int[]{
+        2, 1, 4, 2, 2, 2, 2, 1, 3, 3
+    };
+
+    // 10 long[] values of size 10, were randomly generated, using {@link Random::nextLong} with seed 100
+    // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 0 to those values and stored in
+    // expectedPartitions32BitsX64WithZeroSeedForLongArray.
+    int[] expectedPartitions32BitsX64WithZeroSeedForLongArray = new int[]{
+        0, 1, 4, 4, 3, 3, 2, 1, 4, 0
+    };
+
+    // 10 long[] values of size 10, were randomly generated, using {@link Random::nextLong} with seed 100
+    // Applied org.infinispan.commons.hash.MurmurHash3::MurmurHash3_x64_32 with seed = 0 to those values and stored in
+    // expectedPartitions32BitsX64WithNonZeroSeedForLongArray.
+    int[] expectedPartitions32BitsX64WithNonZeroSeedForLongArray = new int[]{
+        4, 1, 4, 2, 1, 2, 3, 2, 4, 4
+    };
+
+    // 10 String values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
+    // Applied com.google.common.hash.hashing::murmur3_32_fixed with seed = 0 to those values and stored in
+    // expectedMurmurValuesFor32BitX64WithZeroSeed.
+    int[] expectedPartitions32BitsX86WithZeroSeed = new int[]{
+        4, 3, 3, 2, 3, 4, 0, 3, 1, 4
+    };
+
+    // 10 String values of size 7, were randomly generated, using {@link Random::nextBytes} with seed 100
+    // Applied com.google.common.hash.hashing::murmur3_32_fixed with seed = 9001 to those values and stored in
+    // expectedMurmurValuesFor32BitX64WithZeroSeed.
+    int[] expectedPartitions32BitsX86WithNonZeroSeed = new int[]{
+        2, 1, 3, 2, 2, 1, 1, 4, 4, 2
+    };
+
+    // initialized {@link Murmur3PartitionFunction} with 5 partitions and variant as "x64_32".
+    int numPartitions = 5;
+    Map<String, String> functionConfig = new HashMap<>();
+    functionConfig.put("variant", "x64_32");
+
+    // x64_32 variant with seed = 0.
+    Murmur3PartitionFunction murmur3PartitionFunction1 = new Murmur3PartitionFunction(numPartitions, functionConfig);
+
+    // Put seed value in "seed" field in the function config.
+    functionConfig.put("seed", Integer.toString(9001));
+
+    // x64_32 variant with seed = 9001.
+    Murmur3PartitionFunction murmur3PartitionFunction2 = new Murmur3PartitionFunction(numPartitions, functionConfig);
+
+    // x86_32 variant with seed = 0.
+    Murmur3PartitionFunction murmur3PartitionFunction3 = new Murmur3PartitionFunction(numPartitions, null);
+
+    // Remove the variant field.
+    functionConfig.remove("variant");
+
+    // x86_32 bit variant with seed = 9001.
+    Murmur3PartitionFunction murmur3PartitionFunction4 = new Murmur3PartitionFunction(numPartitions, functionConfig);
+
+    // Generate the same 10 String values. Test if the calculated values are equal for both String and byte[] (they
+    // should be equal when String is converted to byte[]) and if the values are equal to the expected values for the
+    // x64_32 variant with seed = 0 and x64_32 variant with seed = 9001.
+    testPartitionFunctionEquivalenceWithStringAndByteArray(murmur3PartitionFunction1,
+        expectedPartitions32BitsX64WithZeroSeedForByteArrayAndString);
+    testPartitionFunctionEquivalenceWithStringAndByteArray(murmur3PartitionFunction2,
+        expectedPartitions32BitsX64WithNonZeroSeedForByteArrayAndString);
+
+    // Generate the same 10 long[] values. Test if the calculated values are equal to the expected values for the x64_32
+    // variant with seed = 0 and x64_32 variant with seed = 9001.
+    testPartitionFunctionEquivalenceWithLongArray(murmur3PartitionFunction1,
+        expectedPartitions32BitsX64WithZeroSeedForLongArray);
+    testPartitionFunctionEquivalenceWithLongArray(murmur3PartitionFunction2,
+        expectedPartitions32BitsX64WithNonZeroSeedForLongArray);
+
+    // Generate the same 10 String values. Test if the calculated values are equal to the expected values for the x86_32
+    // variant with seed = 0 and x86_32 variant with seed = 9001.
+    testPartitionFunctionEquivalence(murmur3PartitionFunction3, expectedPartitions32BitsX86WithZeroSeed);
+    testPartitionFunctionEquivalence(murmur3PartitionFunction4, expectedPartitions32BitsX86WithNonZeroSeed);
+  }
+
   /**
    * Tests the equivalence of kafka.producer.ByteArrayPartitioner::partition and {@link ByteArrayPartitionFunction
    * ::getPartition}
@@ -309,4 +627,114 @@ public class PartitionFunctionTest {
       assertEquals(actualPartition, expectedPartition);
     }
   }
+
+  private void testPartitionFunctionEquivalenceWithStringAndByteArray(PartitionFunction partitionFunction,
+      int[] expectedPartitions) {
+    long seed = 100;
+    Random random = new Random(seed);
+
+    // Generate 10 random String values of size 7, using {@link Random::nextBytes} with seed 100
+    // Apply given partition function
+    // compare with expectedPartitions
+    byte[] array = new byte[7];
+    for (int expectedPartitionNumber : expectedPartitions) {
+      random.nextBytes(array);
+      String nextString = new String(array, UTF_8);
+      int actualPartitionNumberFromString = partitionFunction.getPartition(nextString);
+      int actualPartitionNumberFromByteArray = partitionFunction.getPartition(nextString.getBytes(UTF_8));
+      assertEquals(actualPartitionNumberFromString, actualPartitionNumberFromByteArray);
+      assertEquals(actualPartitionNumberFromString, expectedPartitionNumber);
+    }
+  }
+
+  private void testPartitionFunctionEquivalenceWithLongArray(PartitionFunction partitionFunction,
+      int[] expectedPartitions) {
+    int seed = 100;
+    Random random = new Random(seed);
+
+    // Create a list of 10 long[] values using ArrayList, each of size 10.
+    List<long[]> longList = new ArrayList<>();
+
+    for (int i = 0; i < 10; i++) {
+      long[] longArray = new long[10];
+      for (int j = 0; j < 10; j++) {
+        longArray[j] = random.nextLong();
+      }
+      longList.add(longArray);
+    }
+
+    // Apply the partition function and compare with expected values.
+    for (int i = 0; i < 10; i++) {
+      int actualPartitionNumberFromLongArray = partitionFunction.getPartition(longList.get(i));
+      assertEquals(actualPartitionNumberFromLongArray, expectedPartitions[i]);
+    }
+  }
+
+  private void testMurmur3HashEquivalenceForDifferentDataTypes(int hashSeed, int[] expectedHashValues, String dataType,
+      String variant) {
+    long seed = 100;
+    Random random;
+    int numPartitions = 5;
+    Murmur3PartitionFunction murmur3PartitionFunction = new Murmur3PartitionFunction(numPartitions, null);
+
+    switch (dataType.toLowerCase()) {
+      case "string":
+        // Generate 10 random String values of size 7, using {@link Random::nextBytes} with seed 100
+        // Apply given partition function
+        // compare with expectedPartitions
+        random = new Random(seed);
+        byte[] array1 = new byte[7];
+        for (int expectedHashValue : expectedHashValues) {
+          random.nextBytes(array1);
+          String nextString = new String(array1, UTF_8);
+          int actualHashValueFromString = murmur3PartitionFunction.murmur3Hash32BitsX64(nextString, hashSeed);
+          assertEquals(actualHashValueFromString, expectedHashValue);
+        }
+        break;
+      case "bytearray":
+        // Generate 10 random String values of size 7, using {@link Random::nextBytes} with seed 100
+        // Apply given partition function
+        // compare with expectedPartitions
+        random = new Random(seed);
+        int actualHashValueFromByteArray;
+        byte[] array2 = new byte[7];
+        for (int expectedHashValue : expectedHashValues) {
+          random.nextBytes(array2);
+          if (variant.equals("x64_32")) {
+            actualHashValueFromByteArray = murmur3PartitionFunction.murmur3Hash32BitsX64(array2, hashSeed);
+          } else {
+            actualHashValueFromByteArray = murmur3PartitionFunction.murmur3Hash32BitsX86(array2, hashSeed);
+          }
+          assertEquals(actualHashValueFromByteArray, expectedHashValue);
+        }
+        break;
+      case "longarray":
+        random = new Random(seed);
+        // Create a list of 10 long[] values using ArrayList, each of size 10.
+        List<long[]> longList = new ArrayList<>();
+
+        for (int i = 0; i < 10; i++) {
+          long[] longArray = new long[10];
+          for (int j = 0; j < 10; j++) {
+            longArray[j] = random.nextLong();
+          }
+          longList.add(longArray);
+        }
+
+        // Apply the partition function and compare with expected values.
+        for (int i = 0; i < 10; i++) {
+          int actualHashValueFromLongArray = murmur3PartitionFunction.murmur3Hash32BitsX64(longList.get(i), hashSeed);
+          assertEquals(actualHashValueFromLongArray, expectedHashValues[i]);
+        }
+        break;
+        default:
+    }
+  }
+  private void testToStringAndPartitionNumber(PartitionFunction partitionFunction, int testValueForGetPartition,
+      int numPartitions) {
+    int partition1 = partitionFunction.getPartition(testValueForGetPartition);
+    int partition2 = partitionFunction.getPartition(Integer.toString(testValueForGetPartition));
+    assertEquals(partition1, partition2);
+    assertTrue(partition1 >= 0 && partition1 < numPartitions);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org