You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2012/08/21 22:45:32 UTC

git commit: add Murmur3Partitioner and make it default for new installations patch by Dave Brosius and Pavel Yaskevich; reviewed by Vijay for CASSANDRA-3772

Updated Branches:
  refs/heads/trunk dafcaeb06 -> f41684fde


add Murmur3Partitioner and make it default for new installations
patch by Dave Brosius and Pavel Yaskevich; reviewed by Vijay for CASSANDRA-3772


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f41684fd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f41684fd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f41684fd

Branch: refs/heads/trunk
Commit: f41684fdef7a9e8628cb40f66c13a88fcf7502e3
Parents: dafcaeb
Author: Pavel Yaskevich <xe...@apache.org>
Authored: Tue Aug 21 23:40:31 2012 +0300
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Tue Aug 21 23:40:31 2012 +0300

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 conf/cassandra.yaml                                |    4 +-
 .../cassandra/dht/AbstractHashedPartitioner.java   |  194 +++++++++++++++
 .../apache/cassandra/dht/Murmur3Partitioner.java   |   53 ++++
 .../apache/cassandra/dht/RandomPartitioner.java    |  160 +------------
 5 files changed, 255 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f41684fd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 426ac7d..8fe1770 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -38,6 +38,7 @@
  * (cql3) Add support for 2ndary indexes (CASSANDRA-3680)
  * (cql3) fix defining more than one PK to be invalid (CASSANDRA-4477)
  * remove schema agreement checking from all external APIs (Thrift, CQL and CQL3) (CASSANDRA-4487)
+ * add Murmur3Partitioner and make it default for new installations (CASSANDRA-3772)
 
 
 1.1.5

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f41684fd/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 1b89b2e..5e45961 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -70,6 +70,8 @@ authority: org.apache.cassandra.auth.AllowAllAuthority
 # 
 # - RandomPartitioner distributes rows across the cluster evenly by md5.
 #   When in doubt, this is the best option.
+# - Murmur3Partitioner is similar to RandomPartioner but uses Murmur3_128
+#   Hash Function instead of md5
 # - ByteOrderedPartitioner orders rows lexically by key bytes.  BOP allows
 #   scanning rows in key order, but the ordering can generate hot spots
 #   for sequential insertion workloads.
@@ -81,7 +83,7 @@ authority: org.apache.cassandra.auth.AllowAllAuthority
 #
 # See http://wiki.apache.org/cassandra/Operations for more on
 # partitioners and token selection.
-partitioner: org.apache.cassandra.dht.RandomPartitioner
+partitioner: org.apache.cassandra.dht.Murmur3Partitioner
 
 # directories where Cassandra should store data on disk.
 data_file_directories:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f41684fd/src/java/org/apache/cassandra/dht/AbstractHashedPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/AbstractHashedPartitioner.java b/src/java/org/apache/cassandra/dht/AbstractHashedPartitioner.java
new file mode 100644
index 0000000..55dfb97
--- /dev/null
+++ b/src/java/org/apache/cassandra/dht/AbstractHashedPartitioner.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.dht;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.util.*;
+
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.GuidGenerator;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * This class is the super class of classes that generate a BigIntegerToken using hash function.
+ */
+public abstract class AbstractHashedPartitioner extends AbstractPartitioner<BigIntegerToken>
+{
+    public static final BigInteger ZERO = new BigInteger("0");
+    public static final BigIntegerToken MINIMUM = new BigIntegerToken("-1");
+    public static final BigInteger MAXIMUM = new BigInteger("2").pow(127);
+
+    private static final byte DELIMITER_BYTE = ":".getBytes()[0];
+
+    /**
+     * returns a hash of the byte buffer in the range of 0 - 2**127 as a BigInteger
+     *
+     * @param buffer the buffer to hash
+     * @return the BigInteger hash value
+     */
+    protected abstract BigInteger hash(ByteBuffer buffer);
+
+    public DecoratedKey decorateKey(ByteBuffer key)
+    {
+        return new DecoratedKey(getToken(key), key);
+    }
+
+    public DecoratedKey convertFromDiskFormat(ByteBuffer fromdisk)
+    {
+        // find the delimiter position
+        int splitPoint = -1;
+        for (int i = fromdisk.position(); i < fromdisk.limit(); i++)
+        {
+            if (fromdisk.get(i) == DELIMITER_BYTE)
+            {
+                splitPoint = i;
+                break;
+            }
+        }
+        assert splitPoint != -1;
+
+        // and decode the token and key
+        String token = null;
+        try
+        {
+            token = ByteBufferUtil.string(fromdisk, fromdisk.position(), splitPoint - fromdisk.position());
+        }
+        catch (CharacterCodingException e)
+        {
+            throw new RuntimeException(e);
+        }
+        ByteBuffer key = fromdisk.duplicate();
+        key.position(splitPoint + 1);
+        return new DecoratedKey(new BigIntegerToken(token), key);
+    }
+
+    public Token midpoint(Token ltoken, Token rtoken)
+    {
+        // the symbolic MINIMUM token should act as ZERO: the empty bit array
+        BigInteger left = ltoken.equals(MINIMUM) ? ZERO : ((BigIntegerToken)ltoken).token;
+        BigInteger right = rtoken.equals(MINIMUM) ? ZERO : ((BigIntegerToken)rtoken).token;
+        Pair<BigInteger,Boolean> midpair = FBUtilities.midpoint(left, right, 127);
+        // discard the remainder
+        return new BigIntegerToken(midpair.left);
+    }
+
+    public BigIntegerToken getMinimumToken()
+    {
+        return MINIMUM;
+    }
+
+    public BigIntegerToken getRandomToken()
+    {
+        BigInteger token = hash(GuidGenerator.guidAsBytes());
+        if ( token.signum() == -1 )
+            token = token.multiply(BigInteger.valueOf(-1L));
+        return new BigIntegerToken(token);
+    }
+
+    private final Token.TokenFactory<BigInteger> tokenFactory = new Token.TokenFactory<BigInteger>() {
+        public ByteBuffer toByteArray(Token<BigInteger> bigIntegerToken)
+        {
+            return ByteBuffer.wrap(bigIntegerToken.token.toByteArray());
+        }
+
+        public Token<BigInteger> fromByteArray(ByteBuffer bytes)
+        {
+            return new BigIntegerToken(new BigInteger(ByteBufferUtil.getArray(bytes)));
+        }
+
+        public String toString(Token<BigInteger> bigIntegerToken)
+        {
+            return bigIntegerToken.token.toString();
+        }
+
+        public void validate(String token) throws ConfigurationException
+        {
+            try
+            {
+                BigInteger i = new BigInteger(token);
+                if (i.compareTo(ZERO) < 0)
+                    throw new ConfigurationException("Token must be >= 0");
+                if (i.compareTo(MAXIMUM) > 0)
+                    throw new ConfigurationException("Token must be <= 2**127");
+            }
+            catch (NumberFormatException e)
+            {
+                throw new ConfigurationException(e.getMessage());
+            }
+        }
+
+        public Token<BigInteger> fromString(String string)
+        {
+            return new BigIntegerToken(new BigInteger(string));
+        }
+    };
+
+    public Token.TokenFactory<BigInteger> getTokenFactory()
+    {
+        return tokenFactory;
+    }
+
+    public boolean preservesOrder()
+    {
+        return false;
+    }
+
+    public BigIntegerToken getToken(ByteBuffer key)
+    {
+        if (key.remaining() == 0)
+            return MINIMUM;
+        return new BigIntegerToken(hash(key));
+    }
+
+    public Map<Token, Float> describeOwnership(List<Token> sortedTokens)
+    {
+        Map<Token, Float> ownerships = new HashMap<Token, Float>();
+        Iterator i = sortedTokens.iterator();
+
+        // 0-case
+        if (!i.hasNext()) { throw new RuntimeException("No nodes present in the cluster. How did you call this?"); }
+        // 1-case
+        if (sortedTokens.size() == 1) {
+            ownerships.put((Token)i.next(), new Float(1.0));
+        }
+        // n-case
+        else {
+            // NOTE: All divisions must take place in BigDecimals, and all modulo operators must take place in BigIntegers.
+            final BigInteger ri = MAXIMUM;                                                  //  (used for addition later)
+            final BigDecimal r  = new BigDecimal(ri);                                       // The entire range, 2**127
+            Token start = (Token)i.next(); BigInteger ti = ((BigIntegerToken)start).token;  // The first token and its value
+            Token t; BigInteger tim1 = ti;                                                  // The last token and its value (after loop)
+            while (i.hasNext()) {
+                t = (Token)i.next(); ti = ((BigIntegerToken)t).token;                               // The next token and its value
+                float x = new BigDecimal(ti.subtract(tim1).add(ri).mod(ri)).divide(r).floatValue(); // %age = ((T(i) - T(i-1) + R) % R) / R
+                ownerships.put(t, x);                                                               // save (T(i) -> %age)
+                tim1 = ti;                                                                          // -> advance loop
+            }
+            // The start token's range extends backward to the last token, which is why both were saved above.
+            float x = new BigDecimal(((BigIntegerToken)start).token.subtract(ti).add(ri).mod(ri)).divide(r).floatValue();
+            ownerships.put(start, x);
+        }
+        return ownerships;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f41684fd/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
new file mode 100644
index 0000000..775cef9
--- /dev/null
+++ b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.dht;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.utils.MurmurHash;
+
+/**
+ * This class generates a BigIntegerToken using a Murmur3 hash.
+ */
+public class Murmur3Partitioner extends AbstractHashedPartitioner
+{
+    protected BigInteger hash(ByteBuffer buffer)
+    {
+        long[] bufferHash = MurmurHash.hash3_x64_128(buffer, buffer.position(), buffer.remaining(), 0);
+        byte[] hashBytes = new byte[16];
+
+        writeLong(bufferHash[0], hashBytes, 0);
+        writeLong(bufferHash[1], hashBytes, 8);
+        // make sure it's positive, this isn't the same as abs() but doesn't effect distribution
+        hashBytes[0] = (byte) (hashBytes[0] & 0x7F);
+        return new BigInteger(hashBytes);
+    }
+
+    public static void writeLong(long src, byte[] dest, int offset)
+    {
+        dest[offset] = (byte) (src >> 56);
+        dest[offset + 1] = (byte) (src >> 48);
+        dest[offset + 2] = (byte) (src >> 40);
+        dest[offset + 3] = (byte) (src >> 32);
+        dest[offset + 4] = (byte) (src >> 24);
+        dest[offset + 5] = (byte) (src >> 16);
+        dest[offset + 6] = (byte) (src >> 8);
+        dest[offset + 7] = (byte) (src);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f41684fd/src/java/org/apache/cassandra/dht/RandomPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RandomPartitioner.java b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
index cf3855b..5b95454 100644
--- a/src/java/org/apache/cassandra/dht/RandomPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
@@ -17,170 +17,18 @@
  */
 package org.apache.cassandra.dht;
 
-import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.util.*;
 
-import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.GuidGenerator;
-import org.apache.cassandra.utils.Pair;
 
 /**
- * This class generates a BigIntegerToken using MD5 hash.
+ * This class generates a BigIntegerToken using a MD5 hash.
  */
-public class RandomPartitioner extends AbstractPartitioner<BigIntegerToken>
+public class RandomPartitioner extends AbstractHashedPartitioner
 {
-    public static final BigInteger ZERO = new BigInteger("0");
-    public static final BigIntegerToken MINIMUM = new BigIntegerToken("-1");
-    public static final BigInteger MAXIMUM = new BigInteger("2").pow(127);
-
-    private static final byte DELIMITER_BYTE = ":".getBytes()[0];
-
-    public DecoratedKey decorateKey(ByteBuffer key)
-    {
-        return new DecoratedKey(getToken(key), key);
-    }
-
-    public DecoratedKey convertFromDiskFormat(ByteBuffer fromdisk)
-    {
-        // find the delimiter position
-        int splitPoint = -1;
-        for (int i = fromdisk.position(); i < fromdisk.limit(); i++)
-        {
-            if (fromdisk.get(i) == DELIMITER_BYTE)
-            {
-                splitPoint = i;
-                break;
-            }
-        }
-        assert splitPoint != -1;
-
-        // and decode the token and key
-        String token = null;
-        try
-        {
-            token = ByteBufferUtil.string(fromdisk, fromdisk.position(), splitPoint - fromdisk.position());
-        }
-        catch (CharacterCodingException e)
-        {
-            throw new RuntimeException(e);
-        }
-        ByteBuffer key = fromdisk.duplicate();
-        key.position(splitPoint + 1);
-        return new DecoratedKey(new BigIntegerToken(token), key);
-    }
-
-    public Token midpoint(Token ltoken, Token rtoken)
+    protected BigInteger hash(ByteBuffer buffer)
     {
-        // the symbolic MINIMUM token should act as ZERO: the empty bit array
-        BigInteger left = ltoken.equals(MINIMUM) ? ZERO : ((BigIntegerToken)ltoken).token;
-        BigInteger right = rtoken.equals(MINIMUM) ? ZERO : ((BigIntegerToken)rtoken).token;
-        Pair<BigInteger,Boolean> midpair = FBUtilities.midpoint(left, right, 127);
-        // discard the remainder
-        return new BigIntegerToken(midpair.left);
-    }
-
-    public BigIntegerToken getMinimumToken()
-    {
-        return MINIMUM;
-    }
-
-    public BigIntegerToken getRandomToken()
-    {
-        BigInteger token = FBUtilities.hashToBigInteger(GuidGenerator.guidAsBytes());
-        if ( token.signum() == -1 )
-            token = token.multiply(BigInteger.valueOf(-1L));
-        return new BigIntegerToken(token);
-    }
-
-    private final Token.TokenFactory<BigInteger> tokenFactory = new Token.TokenFactory<BigInteger>() {
-        public ByteBuffer toByteArray(Token<BigInteger> bigIntegerToken)
-        {
-            return ByteBuffer.wrap(bigIntegerToken.token.toByteArray());
-        }
-
-        public Token<BigInteger> fromByteArray(ByteBuffer bytes)
-        {
-            return new BigIntegerToken(new BigInteger(ByteBufferUtil.getArray(bytes)));
-        }
-
-        public String toString(Token<BigInteger> bigIntegerToken)
-        {
-            return bigIntegerToken.token.toString();
-        }
-
-        public void validate(String token) throws ConfigurationException
-        {
-            try
-            {
-                BigInteger i = new BigInteger(token);
-                if (i.compareTo(ZERO) < 0)
-                    throw new ConfigurationException("Token must be >= 0");
-                if (i.compareTo(MAXIMUM) > 0)
-                    throw new ConfigurationException("Token must be <= 2**127");
-            }
-            catch (NumberFormatException e)
-            {
-                throw new ConfigurationException(e.getMessage());
-            }
-        }
-
-        public Token<BigInteger> fromString(String string)
-        {
-            return new BigIntegerToken(new BigInteger(string));
-        }
-    };
-
-    public Token.TokenFactory<BigInteger> getTokenFactory()
-    {
-        return tokenFactory;
-    }
-
-    public boolean preservesOrder()
-    {
-        return false;
-    }
-
-    public BigIntegerToken getToken(ByteBuffer key)
-    {
-        if (key.remaining() == 0)
-            return MINIMUM;
-        return new BigIntegerToken(FBUtilities.hashToBigInteger(key));
-    }
-
-    public Map<Token, Float> describeOwnership(List<Token> sortedTokens)
-    {
-        Map<Token, Float> ownerships = new HashMap<Token, Float>();
-        Iterator i = sortedTokens.iterator();
-
-        // 0-case
-        if (!i.hasNext()) { throw new RuntimeException("No nodes present in the cluster. How did you call this?"); }
-        // 1-case
-        if (sortedTokens.size() == 1) {
-            ownerships.put((Token)i.next(), new Float(1.0));
-        }
-        // n-case
-        else {
-            // NOTE: All divisions must take place in BigDecimals, and all modulo operators must take place in BigIntegers.
-            final BigInteger ri = MAXIMUM;                                                  //  (used for addition later)
-            final BigDecimal r  = new BigDecimal(ri);                                       // The entire range, 2**127
-            Token start = (Token)i.next(); BigInteger ti = ((BigIntegerToken)start).token;  // The first token and its value
-            Token t; BigInteger tim1 = ti;                                                  // The last token and its value (after loop)
-            while (i.hasNext()) {
-                t = (Token)i.next(); ti = ((BigIntegerToken)t).token;                               // The next token and its value
-                float x = new BigDecimal(ti.subtract(tim1).add(ri).mod(ri)).divide(r).floatValue(); // %age = ((T(i) - T(i-1) + R) % R) / R
-                ownerships.put(t, x);                                                               // save (T(i) -> %age)
-                tim1 = ti;                                                                          // -> advance loop
-            }
-            // The start token's range extends backward to the last token, which is why both were saved above.
-            float x = new BigDecimal(((BigIntegerToken)start).token.subtract(ti).add(ri).mod(ri)).divide(r).floatValue();
-            ownerships.put(start, x);
-        }
-        return ownerships;
+        return FBUtilities.hashToBigInteger(buffer);
     }
 }