You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by xe...@apache.org on 2012/08/21 22:45:32 UTC
git commit: add Murmur3Partitioner and make it default for new
installations patch by Dave Brosius and Pavel Yaskevich;
reviewed by Vijay for CASSANDRA-3772
Updated Branches:
refs/heads/trunk dafcaeb06 -> f41684fde
add Murmur3Partitioner and make it default for new installations
patch by Dave Brosius and Pavel Yaskevich; reviewed by Vijay for CASSANDRA-3772
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f41684fd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f41684fd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f41684fd
Branch: refs/heads/trunk
Commit: f41684fdef7a9e8628cb40f66c13a88fcf7502e3
Parents: dafcaeb
Author: Pavel Yaskevich <xe...@apache.org>
Authored: Tue Aug 21 23:40:31 2012 +0300
Committer: Pavel Yaskevich <xe...@apache.org>
Committed: Tue Aug 21 23:40:31 2012 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
conf/cassandra.yaml | 4 +-
.../cassandra/dht/AbstractHashedPartitioner.java | 194 +++++++++++++++
.../apache/cassandra/dht/Murmur3Partitioner.java | 53 ++++
.../apache/cassandra/dht/RandomPartitioner.java | 160 +------------
5 files changed, 255 insertions(+), 157 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f41684fd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 426ac7d..8fe1770 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -38,6 +38,7 @@
* (cql3) Add support for 2ndary indexes (CASSANDRA-3680)
* (cql3) fix defining more than one PK to be invalid (CASSANDRA-4477)
* remove schema agreement checking from all external APIs (Thrift, CQL and CQL3) (CASSANDRA-4487)
+ * add Murmur3Partitioner and make it default for new installations (CASSANDRA-3772)
1.1.5
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f41684fd/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 1b89b2e..5e45961 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -70,6 +70,8 @@ authority: org.apache.cassandra.auth.AllowAllAuthority
#
# - RandomPartitioner distributes rows across the cluster evenly by md5.
# When in doubt, this is the best option.
+# - Murmur3Partitioner is similar to RandomPartioner but uses Murmur3_128
+# Hash Function instead of md5
# - ByteOrderedPartitioner orders rows lexically by key bytes. BOP allows
# scanning rows in key order, but the ordering can generate hot spots
# for sequential insertion workloads.
@@ -81,7 +83,7 @@ authority: org.apache.cassandra.auth.AllowAllAuthority
#
# See http://wiki.apache.org/cassandra/Operations for more on
# partitioners and token selection.
-partitioner: org.apache.cassandra.dht.RandomPartitioner
+partitioner: org.apache.cassandra.dht.Murmur3Partitioner
# directories where Cassandra should store data on disk.
data_file_directories:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f41684fd/src/java/org/apache/cassandra/dht/AbstractHashedPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/AbstractHashedPartitioner.java b/src/java/org/apache/cassandra/dht/AbstractHashedPartitioner.java
new file mode 100644
index 0000000..55dfb97
--- /dev/null
+++ b/src/java/org/apache/cassandra/dht/AbstractHashedPartitioner.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.dht;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.util.*;
+
+import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.GuidGenerator;
+import org.apache.cassandra.utils.Pair;
+
+/**
+ * This class is the super class of classes that generate a BigIntegerToken using hash function.
+ */
+public abstract class AbstractHashedPartitioner extends AbstractPartitioner<BigIntegerToken>
+{
+ public static final BigInteger ZERO = new BigInteger("0");
+ public static final BigIntegerToken MINIMUM = new BigIntegerToken("-1");
+ public static final BigInteger MAXIMUM = new BigInteger("2").pow(127);
+
+ private static final byte DELIMITER_BYTE = ":".getBytes()[0];
+
+ /**
+ * returns a hash of the byte buffer in the range of 0 - 2**127 as a BigInteger
+ *
+ * @param buffer the buffer to hash
+ * @return the BigInteger hash value
+ */
+ protected abstract BigInteger hash(ByteBuffer buffer);
+
+ public DecoratedKey decorateKey(ByteBuffer key)
+ {
+ return new DecoratedKey(getToken(key), key);
+ }
+
+ public DecoratedKey convertFromDiskFormat(ByteBuffer fromdisk)
+ {
+ // find the delimiter position
+ int splitPoint = -1;
+ for (int i = fromdisk.position(); i < fromdisk.limit(); i++)
+ {
+ if (fromdisk.get(i) == DELIMITER_BYTE)
+ {
+ splitPoint = i;
+ break;
+ }
+ }
+ assert splitPoint != -1;
+
+ // and decode the token and key
+ String token = null;
+ try
+ {
+ token = ByteBufferUtil.string(fromdisk, fromdisk.position(), splitPoint - fromdisk.position());
+ }
+ catch (CharacterCodingException e)
+ {
+ throw new RuntimeException(e);
+ }
+ ByteBuffer key = fromdisk.duplicate();
+ key.position(splitPoint + 1);
+ return new DecoratedKey(new BigIntegerToken(token), key);
+ }
+
+ public Token midpoint(Token ltoken, Token rtoken)
+ {
+ // the symbolic MINIMUM token should act as ZERO: the empty bit array
+ BigInteger left = ltoken.equals(MINIMUM) ? ZERO : ((BigIntegerToken)ltoken).token;
+ BigInteger right = rtoken.equals(MINIMUM) ? ZERO : ((BigIntegerToken)rtoken).token;
+ Pair<BigInteger,Boolean> midpair = FBUtilities.midpoint(left, right, 127);
+ // discard the remainder
+ return new BigIntegerToken(midpair.left);
+ }
+
+ public BigIntegerToken getMinimumToken()
+ {
+ return MINIMUM;
+ }
+
+ public BigIntegerToken getRandomToken()
+ {
+ BigInteger token = hash(GuidGenerator.guidAsBytes());
+ if ( token.signum() == -1 )
+ token = token.multiply(BigInteger.valueOf(-1L));
+ return new BigIntegerToken(token);
+ }
+
+ private final Token.TokenFactory<BigInteger> tokenFactory = new Token.TokenFactory<BigInteger>() {
+ public ByteBuffer toByteArray(Token<BigInteger> bigIntegerToken)
+ {
+ return ByteBuffer.wrap(bigIntegerToken.token.toByteArray());
+ }
+
+ public Token<BigInteger> fromByteArray(ByteBuffer bytes)
+ {
+ return new BigIntegerToken(new BigInteger(ByteBufferUtil.getArray(bytes)));
+ }
+
+ public String toString(Token<BigInteger> bigIntegerToken)
+ {
+ return bigIntegerToken.token.toString();
+ }
+
+ public void validate(String token) throws ConfigurationException
+ {
+ try
+ {
+ BigInteger i = new BigInteger(token);
+ if (i.compareTo(ZERO) < 0)
+ throw new ConfigurationException("Token must be >= 0");
+ if (i.compareTo(MAXIMUM) > 0)
+ throw new ConfigurationException("Token must be <= 2**127");
+ }
+ catch (NumberFormatException e)
+ {
+ throw new ConfigurationException(e.getMessage());
+ }
+ }
+
+ public Token<BigInteger> fromString(String string)
+ {
+ return new BigIntegerToken(new BigInteger(string));
+ }
+ };
+
+ public Token.TokenFactory<BigInteger> getTokenFactory()
+ {
+ return tokenFactory;
+ }
+
+ public boolean preservesOrder()
+ {
+ return false;
+ }
+
+ public BigIntegerToken getToken(ByteBuffer key)
+ {
+ if (key.remaining() == 0)
+ return MINIMUM;
+ return new BigIntegerToken(hash(key));
+ }
+
+ public Map<Token, Float> describeOwnership(List<Token> sortedTokens)
+ {
+ Map<Token, Float> ownerships = new HashMap<Token, Float>();
+ Iterator i = sortedTokens.iterator();
+
+ // 0-case
+ if (!i.hasNext()) { throw new RuntimeException("No nodes present in the cluster. How did you call this?"); }
+ // 1-case
+ if (sortedTokens.size() == 1) {
+ ownerships.put((Token)i.next(), new Float(1.0));
+ }
+ // n-case
+ else {
+ // NOTE: All divisions must take place in BigDecimals, and all modulo operators must take place in BigIntegers.
+ final BigInteger ri = MAXIMUM; // (used for addition later)
+ final BigDecimal r = new BigDecimal(ri); // The entire range, 2**127
+ Token start = (Token)i.next(); BigInteger ti = ((BigIntegerToken)start).token; // The first token and its value
+ Token t; BigInteger tim1 = ti; // The last token and its value (after loop)
+ while (i.hasNext()) {
+ t = (Token)i.next(); ti = ((BigIntegerToken)t).token; // The next token and its value
+ float x = new BigDecimal(ti.subtract(tim1).add(ri).mod(ri)).divide(r).floatValue(); // %age = ((T(i) - T(i-1) + R) % R) / R
+ ownerships.put(t, x); // save (T(i) -> %age)
+ tim1 = ti; // -> advance loop
+ }
+ // The start token's range extends backward to the last token, which is why both were saved above.
+ float x = new BigDecimal(((BigIntegerToken)start).token.subtract(ti).add(ri).mod(ri)).divide(r).floatValue();
+ ownerships.put(start, x);
+ }
+ return ownerships;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f41684fd/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
new file mode 100644
index 0000000..775cef9
--- /dev/null
+++ b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.dht;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.utils.MurmurHash;
+
+/**
+ * This class generates a BigIntegerToken using a Murmur3 hash.
+ */
+public class Murmur3Partitioner extends AbstractHashedPartitioner
+{
+ protected BigInteger hash(ByteBuffer buffer)
+ {
+ long[] bufferHash = MurmurHash.hash3_x64_128(buffer, buffer.position(), buffer.remaining(), 0);
+ byte[] hashBytes = new byte[16];
+
+ writeLong(bufferHash[0], hashBytes, 0);
+ writeLong(bufferHash[1], hashBytes, 8);
+ // make sure it's positive, this isn't the same as abs() but doesn't effect distribution
+ hashBytes[0] = (byte) (hashBytes[0] & 0x7F);
+ return new BigInteger(hashBytes);
+ }
+
+ public static void writeLong(long src, byte[] dest, int offset)
+ {
+ dest[offset] = (byte) (src >> 56);
+ dest[offset + 1] = (byte) (src >> 48);
+ dest[offset + 2] = (byte) (src >> 40);
+ dest[offset + 3] = (byte) (src >> 32);
+ dest[offset + 4] = (byte) (src >> 24);
+ dest[offset + 5] = (byte) (src >> 16);
+ dest[offset + 6] = (byte) (src >> 8);
+ dest[offset + 7] = (byte) (src);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f41684fd/src/java/org/apache/cassandra/dht/RandomPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RandomPartitioner.java b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
index cf3855b..5b95454 100644
--- a/src/java/org/apache/cassandra/dht/RandomPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
@@ -17,170 +17,18 @@
*/
package org.apache.cassandra.dht;
-import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.util.*;
-import org.apache.cassandra.config.ConfigurationException;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.GuidGenerator;
-import org.apache.cassandra.utils.Pair;
/**
- * This class generates a BigIntegerToken using MD5 hash.
+ * This class generates a BigIntegerToken using a MD5 hash.
*/
-public class RandomPartitioner extends AbstractPartitioner<BigIntegerToken>
+public class RandomPartitioner extends AbstractHashedPartitioner
{
- public static final BigInteger ZERO = new BigInteger("0");
- public static final BigIntegerToken MINIMUM = new BigIntegerToken("-1");
- public static final BigInteger MAXIMUM = new BigInteger("2").pow(127);
-
- private static final byte DELIMITER_BYTE = ":".getBytes()[0];
-
- public DecoratedKey decorateKey(ByteBuffer key)
- {
- return new DecoratedKey(getToken(key), key);
- }
-
- public DecoratedKey convertFromDiskFormat(ByteBuffer fromdisk)
- {
- // find the delimiter position
- int splitPoint = -1;
- for (int i = fromdisk.position(); i < fromdisk.limit(); i++)
- {
- if (fromdisk.get(i) == DELIMITER_BYTE)
- {
- splitPoint = i;
- break;
- }
- }
- assert splitPoint != -1;
-
- // and decode the token and key
- String token = null;
- try
- {
- token = ByteBufferUtil.string(fromdisk, fromdisk.position(), splitPoint - fromdisk.position());
- }
- catch (CharacterCodingException e)
- {
- throw new RuntimeException(e);
- }
- ByteBuffer key = fromdisk.duplicate();
- key.position(splitPoint + 1);
- return new DecoratedKey(new BigIntegerToken(token), key);
- }
-
- public Token midpoint(Token ltoken, Token rtoken)
+ protected BigInteger hash(ByteBuffer buffer)
{
- // the symbolic MINIMUM token should act as ZERO: the empty bit array
- BigInteger left = ltoken.equals(MINIMUM) ? ZERO : ((BigIntegerToken)ltoken).token;
- BigInteger right = rtoken.equals(MINIMUM) ? ZERO : ((BigIntegerToken)rtoken).token;
- Pair<BigInteger,Boolean> midpair = FBUtilities.midpoint(left, right, 127);
- // discard the remainder
- return new BigIntegerToken(midpair.left);
- }
-
- public BigIntegerToken getMinimumToken()
- {
- return MINIMUM;
- }
-
- public BigIntegerToken getRandomToken()
- {
- BigInteger token = FBUtilities.hashToBigInteger(GuidGenerator.guidAsBytes());
- if ( token.signum() == -1 )
- token = token.multiply(BigInteger.valueOf(-1L));
- return new BigIntegerToken(token);
- }
-
- private final Token.TokenFactory<BigInteger> tokenFactory = new Token.TokenFactory<BigInteger>() {
- public ByteBuffer toByteArray(Token<BigInteger> bigIntegerToken)
- {
- return ByteBuffer.wrap(bigIntegerToken.token.toByteArray());
- }
-
- public Token<BigInteger> fromByteArray(ByteBuffer bytes)
- {
- return new BigIntegerToken(new BigInteger(ByteBufferUtil.getArray(bytes)));
- }
-
- public String toString(Token<BigInteger> bigIntegerToken)
- {
- return bigIntegerToken.token.toString();
- }
-
- public void validate(String token) throws ConfigurationException
- {
- try
- {
- BigInteger i = new BigInteger(token);
- if (i.compareTo(ZERO) < 0)
- throw new ConfigurationException("Token must be >= 0");
- if (i.compareTo(MAXIMUM) > 0)
- throw new ConfigurationException("Token must be <= 2**127");
- }
- catch (NumberFormatException e)
- {
- throw new ConfigurationException(e.getMessage());
- }
- }
-
- public Token<BigInteger> fromString(String string)
- {
- return new BigIntegerToken(new BigInteger(string));
- }
- };
-
- public Token.TokenFactory<BigInteger> getTokenFactory()
- {
- return tokenFactory;
- }
-
- public boolean preservesOrder()
- {
- return false;
- }
-
- public BigIntegerToken getToken(ByteBuffer key)
- {
- if (key.remaining() == 0)
- return MINIMUM;
- return new BigIntegerToken(FBUtilities.hashToBigInteger(key));
- }
-
- public Map<Token, Float> describeOwnership(List<Token> sortedTokens)
- {
- Map<Token, Float> ownerships = new HashMap<Token, Float>();
- Iterator i = sortedTokens.iterator();
-
- // 0-case
- if (!i.hasNext()) { throw new RuntimeException("No nodes present in the cluster. How did you call this?"); }
- // 1-case
- if (sortedTokens.size() == 1) {
- ownerships.put((Token)i.next(), new Float(1.0));
- }
- // n-case
- else {
- // NOTE: All divisions must take place in BigDecimals, and all modulo operators must take place in BigIntegers.
- final BigInteger ri = MAXIMUM; // (used for addition later)
- final BigDecimal r = new BigDecimal(ri); // The entire range, 2**127
- Token start = (Token)i.next(); BigInteger ti = ((BigIntegerToken)start).token; // The first token and its value
- Token t; BigInteger tim1 = ti; // The last token and its value (after loop)
- while (i.hasNext()) {
- t = (Token)i.next(); ti = ((BigIntegerToken)t).token; // The next token and its value
- float x = new BigDecimal(ti.subtract(tim1).add(ri).mod(ri)).divide(r).floatValue(); // %age = ((T(i) - T(i-1) + R) % R) / R
- ownerships.put(t, x); // save (T(i) -> %age)
- tim1 = ti; // -> advance loop
- }
- // The start token's range extends backward to the last token, which is why both were saved above.
- float x = new BigDecimal(((BigIntegerToken)start).token.subtract(ti).add(ri).mod(ri)).divide(r).floatValue();
- ownerships.put(start, x);
- }
- return ownerships;
+ return FBUtilities.hashToBigInteger(buffer);
}
}