You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2016/09/19 23:38:15 UTC
cassandra git commit: Make randompartitioner work with new vnode
allocation
Repository: cassandra
Updated Branches:
refs/heads/trunk f42e235b1 -> c1a9a47df
Make randompartitioner work with new vnode allocation
patch by Dikang Gu; reviewed by Branimir Lambov for CASSANDRA-12647
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c1a9a47d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c1a9a47d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c1a9a47d
Branch: refs/heads/trunk
Commit: c1a9a47df292dbbde3c675c10d68043e7b212c28
Parents: f42e235
Author: Dikang Gu <di...@gmail.com>
Authored: Wed Sep 14 23:04:14 2016 -0700
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Mon Sep 19 16:36:34 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/dht/RandomPartitioner.java | 13 +++
.../ReplicationAwareTokenAllocatorTest.java | 84 +++++++++++++++-----
3 files changed, 76 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1a9a47d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8e39d95..b625a58 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.10
+ * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
* Fix cassandra-stress graphing (CASSANDRA-12237)
* Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031)
* Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1a9a47d/src/java/org/apache/cassandra/dht/RandomPartitioner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RandomPartitioner.java b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
index c063be3..7c8f6ac 100644
--- a/src/java/org/apache/cassandra/dht/RandomPartitioner.java
+++ b/src/java/org/apache/cassandra/dht/RandomPartitioner.java
@@ -177,6 +177,19 @@ public class RandomPartitioner implements IPartitioner
{
return HEAP_SIZE;
}
+
+ public Token increaseSlightly()
+ {
+ return new BigIntegerToken(token.add(BigInteger.ONE));
+ }
+
+ public double size(Token next)
+ {
+ BigIntegerToken n = (BigIntegerToken) next;
+ BigInteger v = n.token.subtract(token); // Overflow acceptable and desired.
+ double d = Math.scalb(v.doubleValue(), -127); // Scale so that the full range is 1.
+ return d > 0.0 ? d : (d + 1.0); // Adjust for signed long, also making sure t.size(t) == 1.
+ }
}
public BigIntegerToken getToken(ByteBuffer key)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1a9a47d/test/long/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocatorTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocatorTest.java b/test/long/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocatorTest.java
index 1b36c55..482e2ac 100644
--- a/test/long/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocatorTest.java
+++ b/test/long/org/apache/cassandra/dht/tokenallocator/ReplicationAwareTokenAllocatorTest.java
@@ -30,7 +30,9 @@ import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
import org.junit.Test;
import org.apache.cassandra.Util;
+import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.dht.Token;
public class ReplicationAwareTokenAllocatorTest
@@ -489,10 +491,10 @@ public class ReplicationAwareTokenAllocatorTest
}
};
- Murmur3Partitioner partitioner = new Murmur3Partitioner();
Random seededRand = new Random(2);
- private void random(Map<Token, Unit> map, TestReplicationStrategy rs, int unitCount, TokenCount tc, int perUnitCount)
+ private void random(Map<Token, Unit> map, TestReplicationStrategy rs,
+ int unitCount, TokenCount tc, int perUnitCount, IPartitioner partitioner)
{
System.out.format("\nRandom generation of %d units with %d tokens each\n", unitCount, perUnitCount);
Random rand = seededRand;
@@ -509,49 +511,82 @@ public class ReplicationAwareTokenAllocatorTest
}
@Test
- public void testExistingCluster()
+ public void testExistingClusterWithRandomPartitioner()
+ {
+ testExistingCluster(new RandomPartitioner());
+ }
+
+ @Test
+ public void testExistingClusterWithMurmur3Partitioner()
+ {
+ testExistingCluster(new Murmur3Partitioner());
+ }
+
+ public void testExistingCluster(IPartitioner partitioner)
{
for (int rf = 1; rf <= 5; ++rf)
{
for (int perUnitCount = 1; perUnitCount <= MAX_VNODE_COUNT; perUnitCount *= 4)
{
- testExistingCluster(perUnitCount, fixedTokenCount, new SimpleReplicationStrategy(rf));
- testExistingCluster(perUnitCount, varyingTokenCount, new SimpleReplicationStrategy(rf));
+ testExistingCluster(perUnitCount, fixedTokenCount, new SimpleReplicationStrategy(rf), partitioner);
+ testExistingCluster(perUnitCount, varyingTokenCount, new SimpleReplicationStrategy(rf), partitioner);
if (rf == 1) continue; // Replication strategy doesn't matter for RF = 1.
for (int groupSize = 4; groupSize <= 64 && groupSize * rf * 4 < TARGET_CLUSTER_SIZE; groupSize *= 4)
{
- testExistingCluster(perUnitCount, fixedTokenCount, new BalancedGroupReplicationStrategy(rf, groupSize));
- testExistingCluster(perUnitCount, varyingTokenCount, new UnbalancedGroupReplicationStrategy(rf, groupSize / 2, groupSize * 2, seededRand));
+ testExistingCluster(perUnitCount, fixedTokenCount,
+ new BalancedGroupReplicationStrategy(rf, groupSize), partitioner);
+ testExistingCluster(perUnitCount, varyingTokenCount,
+ new UnbalancedGroupReplicationStrategy(rf, groupSize / 2, groupSize * 2, seededRand),
+ partitioner);
}
- testExistingCluster(perUnitCount, fixedTokenCount, new FixedGroupCountReplicationStrategy(rf, rf * 2));
+ testExistingCluster(perUnitCount, fixedTokenCount,
+ new FixedGroupCountReplicationStrategy(rf, rf * 2), partitioner);
}
}
}
- public void testExistingCluster(int perUnitCount, TokenCount tc, TestReplicationStrategy rs)
+ public void testExistingCluster(int perUnitCount, TokenCount tc, TestReplicationStrategy rs, IPartitioner partitioner)
{
System.out.println("Testing existing cluster, target " + perUnitCount + " vnodes, replication " + rs);
final int targetClusterSize = TARGET_CLUSTER_SIZE;
NavigableMap<Token, Unit> tokenMap = Maps.newTreeMap();
- random(tokenMap, rs, targetClusterSize / 2, tc, perUnitCount);
+ random(tokenMap, rs, targetClusterSize / 2, tc, perUnitCount, partitioner);
ReplicationAwareTokenAllocator<Unit> t = new ReplicationAwareTokenAllocator<>(tokenMap, rs, partitioner);
grow(t, targetClusterSize * 9 / 10, tc, perUnitCount, false);
grow(t, targetClusterSize, tc, perUnitCount, true);
- loseAndReplace(t, targetClusterSize / 10, tc, perUnitCount);
+ loseAndReplace(t, targetClusterSize / 10, tc, perUnitCount, partitioner);
System.out.println();
}
@Test
- public void testNewCluster()
+ public void testNewClusterWithRandomPartitioner()
{
- Util.flakyTest(this::flakyTestNewCluster,
+ Util.flakyTest(this::flakyTestNewClusterWithRandomPartitioner,
5,
"It tends to fail sometimes due to the random selection of the tokens in the first few nodes.");
}
- public void flakyTestNewCluster()
+ @Test
+ public void testNewClusterWithMurmur3Partitioner()
+ {
+ Util.flakyTest(this::flakyTestNewClusterWithMurmur3Partitioner,
+ 5,
+ "It tends to fail sometimes due to the random selection of the tokens in the first few nodes.");
+ }
+
+ public void flakyTestNewClusterWithRandomPartitioner()
+ {
+ flakyTestNewCluster(new RandomPartitioner());
+ }
+
+ public void flakyTestNewClusterWithMurmur3Partitioner()
+ {
+ flakyTestNewCluster(new Murmur3Partitioner());
+ }
+
+ public void flakyTestNewCluster(IPartitioner partitioner)
{
// This test is flaky because the selection of the tokens for the first RF nodes (which is random, with an
// uncontrolled seed) can sometimes cause a pathological situation where the algorithm will find a (close to)
@@ -564,20 +599,24 @@ public class ReplicationAwareTokenAllocatorTest
{
for (int perUnitCount = 1; perUnitCount <= MAX_VNODE_COUNT; perUnitCount *= 4)
{
- testNewCluster(perUnitCount, fixedTokenCount, new SimpleReplicationStrategy(rf));
- testNewCluster(perUnitCount, varyingTokenCount, new SimpleReplicationStrategy(rf));
+ testNewCluster(perUnitCount, fixedTokenCount, new SimpleReplicationStrategy(rf), partitioner);
+ testNewCluster(perUnitCount, varyingTokenCount, new SimpleReplicationStrategy(rf), partitioner);
if (rf == 1) continue; // Replication strategy doesn't matter for RF = 1.
for (int groupSize = 4; groupSize <= 64 && groupSize * rf * 8 < TARGET_CLUSTER_SIZE; groupSize *= 4)
{
- testNewCluster(perUnitCount, fixedTokenCount, new BalancedGroupReplicationStrategy(rf, groupSize));
- testNewCluster(perUnitCount, varyingTokenCount, new UnbalancedGroupReplicationStrategy(rf, groupSize / 2, groupSize * 2, seededRand));
+ testNewCluster(perUnitCount, fixedTokenCount,
+ new BalancedGroupReplicationStrategy(rf, groupSize), partitioner);
+ testNewCluster(perUnitCount, varyingTokenCount,
+ new UnbalancedGroupReplicationStrategy(rf, groupSize / 2, groupSize * 2, seededRand),
+ partitioner);
}
- testNewCluster(perUnitCount, fixedTokenCount, new FixedGroupCountReplicationStrategy(rf, rf * 2));
+ testNewCluster(perUnitCount, fixedTokenCount,
+ new FixedGroupCountReplicationStrategy(rf, rf * 2), partitioner);
}
}
}
- public void testNewCluster(int perUnitCount, TokenCount tc, TestReplicationStrategy rs)
+ public void testNewCluster(int perUnitCount, TokenCount tc, TestReplicationStrategy rs, IPartitioner partitioner)
{
System.out.println("Testing new cluster, target " + perUnitCount + " vnodes, replication " + rs);
final int targetClusterSize = TARGET_CLUSTER_SIZE;
@@ -586,11 +625,12 @@ public class ReplicationAwareTokenAllocatorTest
ReplicationAwareTokenAllocator<Unit> t = new ReplicationAwareTokenAllocator<>(tokenMap, rs, partitioner);
grow(t, targetClusterSize * 2 / 5, tc, perUnitCount, false);
grow(t, targetClusterSize, tc, perUnitCount, true);
- loseAndReplace(t, targetClusterSize / 5, tc, perUnitCount);
+ loseAndReplace(t, targetClusterSize / 5, tc, perUnitCount, partitioner);
System.out.println();
}
- private void loseAndReplace(ReplicationAwareTokenAllocator<Unit> t, int howMany, TokenCount tc, int perUnitCount)
+ private void loseAndReplace(ReplicationAwareTokenAllocator<Unit> t, int howMany,
+ TokenCount tc, int perUnitCount, IPartitioner partitioner)
{
int fullCount = t.unitCount();
System.out.format("Losing %d units. ", howMany);