You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/01/08 15:29:20 UTC
[10/22] cassandra git commit: Fix pending range calculation during
moves (2.2 version)
Fix pending range calculation during moves (2.2 version)
patch by kohlisankalp; reviewed by blambov for CASSANDRA-10887
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/23123f04
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/23123f04
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/23123f04
Branch: refs/heads/cassandra-3.0
Commit: 23123f04fd5e5381742b2bae16bb3e03225598c3
Parents: 44a0578
Author: sankalp kohli <ko...@gmail.com>
Authored: Thu Jan 7 16:21:47 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Jan 8 15:23:30 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/dht/Range.java | 21 +
.../apache/cassandra/locator/TokenMetadata.java | 34 +-
test/unit/org/apache/cassandra/Util.java | 4 +-
.../org/apache/cassandra/dht/RangeTest.java | 55 +++
.../org/apache/cassandra/service/MoveTest.java | 491 ++++++++++++++++++-
6 files changed, 576 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/23123f04/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a26f9e0..e5c4430 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -14,6 +14,7 @@
* Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
* Verify tables in pseudo-system keyspaces at startup (CASSANDRA-10761)
Merged from 2.1:
+ * Fix pending range calculation during moves (CASSANDRA-10887)
* Sane default (200Mbps) for inter-DC streaming througput (CASSANDRA-9708)
* Match cassandra-loader options in COPY FROM (CASSANDRA-9303)
* Fix binding to any address in CqlBulkRecordWriter (CASSANDRA-9309)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/23123f04/src/java/org/apache/cassandra/dht/Range.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Range.java b/src/java/org/apache/cassandra/dht/Range.java
index 9893531..f2c5996 100644
--- a/src/java/org/apache/cassandra/dht/Range.java
+++ b/src/java/org/apache/cassandra/dht/Range.java
@@ -292,7 +292,28 @@ public class Range<T extends RingPosition<T>> extends AbstractBounds<T> implemen
return rhs.differenceToFetch(this);
}
+ public Set<Range<T>> subtractAll(Collection<Range<T>> ranges)
+ {
+ Set<Range<T>> result = new HashSet<>();
+ result.add(this);
+ for(Range<T> range : ranges)
+ {
+ result = substractAllFromToken(result, range);
+ }
+
+ return result;
+ }
+ private static <T extends RingPosition<T>> Set<Range<T>> substractAllFromToken(Set<Range<T>> ranges, Range<T> subtract)
+ {
+ Set<Range<T>> result = new HashSet<>();
+ for(Range<T> range : ranges)
+ {
+ result.addAll(range.subtract(subtract));
+ }
+
+ return result;
+ }
/**
* Calculate set of the difference ranges of given two ranges
* (as current (A, B] and rhs is (C, D])
http://git-wip-us.apache.org/repos/asf/cassandra/blob/23123f04/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 00d8ee9..de16fda 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -799,14 +799,42 @@ public class TokenMetadata
// simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be.
for (Pair<Token, InetAddress> moving : movingEndpoints)
{
+ //Calculate all the ranges which will could be affected. This will include the ranges before and after the move.
+ Set<Range<Token>> moveAffectedRanges = new HashSet<>();
InetAddress endpoint = moving.right; // address of the moving node
+ //Add ranges before the move
+ for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+ {
+ moveAffectedRanges.add(range);
+ }
- // moving.left is a new token of the endpoint
allLeftMetadata.updateNormalToken(moving.left, endpoint);
-
+ //Add ranges after the move
for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
{
- newPendingRanges.addPendingRange(range, endpoint);
+ moveAffectedRanges.add(range);
+ }
+
+ for(Range<Token> range : moveAffectedRanges)
+ {
+ Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
+ Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
+ Set<InetAddress> difference = Sets.difference(newEndpoints, currentEndpoints);
+ for(final InetAddress address : difference)
+ {
+ Collection<Range<Token>> newRanges = strategy.getAddressRanges(allLeftMetadata).get(address);
+ Collection<Range<Token>> oldRanges = strategy.getAddressRanges(metadata).get(address);
+ //We want to get rid of any ranges which the node is currently getting.
+ newRanges.removeAll(oldRanges);
+
+ for(Range<Token> newRange : newRanges)
+ {
+ for(Range<Token> pendingRange : newRange.subtractAll(oldRanges))
+ {
+ newPendingRanges.addPendingRange(pendingRange, address);
+ }
+ }
+ }
}
allLeftMetadata.removeEndpoint(endpoint);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/23123f04/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index da81aaa..91aa5fd 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -273,9 +273,11 @@ public class Util
for (int i = hostIdPool.size(); i < howMany; i++)
hostIdPool.add(UUID.randomUUID());
+ boolean endpointTokenPrefilled = endpointTokens != null && !endpointTokens.isEmpty();
for (int i=0; i<howMany; i++)
{
- endpointTokens.add(new BigIntegerToken(String.valueOf(10 * i)));
+ if(!endpointTokenPrefilled)
+ endpointTokens.add(new BigIntegerToken(String.valueOf(10 * i)));
keyTokens.add(new BigIntegerToken(String.valueOf(10 * i + 5)));
hostIds.add(hostIdPool.get(i));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/23123f04/test/unit/org/apache/cassandra/dht/RangeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/RangeTest.java b/test/unit/org/apache/cassandra/dht/RangeTest.java
index 85f2586..4255487 100644
--- a/test/unit/org/apache/cassandra/dht/RangeTest.java
+++ b/test/unit/org/apache/cassandra/dht/RangeTest.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.dht;
import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -30,6 +31,7 @@ import com.google.common.base.Joiner;
import static java.util.Arrays.asList;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
import org.apache.cassandra.db.RowPosition;
@@ -329,6 +331,59 @@ public class RangeTest
assert t1.compareTo(t4) == 0;
}
+ private Range<Token> makeRange(long token1, long token2)
+ {
+ return new Range<Token>(new Murmur3Partitioner.LongToken(token1), new Murmur3Partitioner.LongToken(token2));
+ }
+
+ private void assertRanges(Set<Range<Token>> result, Long ... tokens)
+ {
+ assert tokens.length % 2 ==0;
+
+ final Set<Range<Token>> expected = new HashSet<>();
+ for(int i=0; i < tokens.length; i+=2)
+ {
+ expected.add(makeRange(tokens[i], tokens[i+1]));
+ }
+
+ assert CollectionUtils.isEqualCollection(result, expected);
+
+ }
+
+ @Test
+ public void testSubtractAll()
+ {
+ Range<Token> range = new Range<Token>(new Murmur3Partitioner.LongToken(1L), new Murmur3Partitioner.LongToken(100L));
+
+ Collection<Range<Token>> collection = new HashSet<>();
+ collection.add(makeRange(1L, 10L));
+ assertRanges(range.subtractAll(collection), 10L, 100L);
+ collection.add(makeRange(90L, 100L));
+ assertRanges(range.subtractAll(collection), 10L, 90L);
+ collection.add(makeRange(54L, 60L));
+ assertRanges(range.subtractAll(collection), 10L, 54L, 60L, 90L);
+ collection.add(makeRange(80L, 95L));
+ assertRanges(range.subtractAll(collection), 10L, 54L, 60L, 80L);
+ }
+
+ @Test
+ public void testSubtractAllWithWrapAround()
+ {
+ Range<Token> range = new Range<Token>(new Murmur3Partitioner.LongToken(100L), new Murmur3Partitioner.LongToken(10L));
+
+ Collection<Range<Token>> collection = new HashSet<>();
+ collection.add(makeRange(20L, 30L));
+ assertRanges(range.subtractAll(collection), 100L, 10L);
+ collection.add(makeRange(200L, 500L));
+ assertRanges(range.subtractAll(collection), 100L, 200L, 500L, 10L);
+ collection.add(makeRange(1L, 10L));
+ assertRanges(range.subtractAll(collection), 100L, 200L, 500L, 1L);
+ collection.add(makeRange(0L, 1L));
+ assertRanges(range.subtractAll(collection), 100L, 200L, 500L, 0L);
+ collection.add(makeRange(1000L, 0));
+ assertRanges(range.subtractAll(collection), 100L, 200L, 500L, 1000L);
+ }
+
private Range<Token> makeRange(String token1, String token2)
{
return new Range<Token>(new BigIntegerToken(token1), new BigIntegerToken(token2));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/23123f04/test/unit/org/apache/cassandra/service/MoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/MoveTest.java b/test/unit/org/apache/cassandra/service/MoveTest.java
index 6c9e589..bd4317d 100644
--- a/test/unit/org/apache/cassandra/service/MoveTest.java
+++ b/test/unit/org/apache/cassandra/service/MoveTest.java
@@ -27,7 +27,13 @@ import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import static org.junit.Assert.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.locator.AbstractNetworkTopologySnitch;
+import org.apache.cassandra.locator.NetworkTopologyStrategy;
+import org.apache.cassandra.locator.PendingRangeMaps;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -50,10 +56,17 @@ public class MoveTest
{
private static final IPartitioner partitioner = RandomPartitioner.instance;
private static IPartitioner oldPartitioner;
- private static final String KEYSPACE1 = "MoveTestKeyspace1";
+ //Simple Strategy Keyspaces
+ private static final String Simple_RF1_KeyspaceName = "MoveTestKeyspace1";
+ private static final String Simple_RF2_KeyspaceName = "MoveTestKeyspace5";
+ private static final String Simple_RF3_KeyspaceName = "MoveTestKeyspace4";
private static final String KEYSPACE2 = "MoveTestKeyspace2";
private static final String KEYSPACE3 = "MoveTestKeyspace3";
- private static final String KEYSPACE4 = "MoveTestKeyspace4";
+
+ //Network Strategy Keyspace with RF DC1=1 and DC2=1 and so on.
+ private static final String Network_11_KeyspaceName = "MoveTestNetwork11";
+ private static final String Network_22_KeyspaceName = "MoveTestNetwork22";
+ private static final String Network_33_KeyspaceName = "MoveTestNetwork33";
/*
* NOTE: the tests above uses RandomPartitioner, which is not the default
@@ -67,6 +80,9 @@ public class MoveTest
oldPartitioner = StorageService.instance.setPartitionerUnsafe(partitioner);
SchemaLoader.loadSchema();
SchemaLoader.schemaDefinition("MoveTest");
+ addNetworkTopologyKeyspace(Network_11_KeyspaceName, 1, 1);
+ addNetworkTopologyKeyspace(Network_22_KeyspaceName, 2, 2);
+ addNetworkTopologyKeyspace(Network_33_KeyspaceName, 3, 3);
}
@AfterClass
@@ -82,6 +98,427 @@ public class MoveTest
StorageService.instance.getTokenMetadata().clearUnsafe();
}
+ private static void addNetworkTopologyKeyspace(String keyspaceName, Integer... replicas) throws ConfigurationException
+ {
+
+ DatabaseDescriptor.setEndpointSnitch(new AbstractNetworkTopologySnitch()
+ {
+ //Odd IPs are in DC1 and Even are in DC2. Endpoints upto .14 will have unique racks and
+ // then will be same for a set of three.
+ @Override
+ public String getRack(InetAddress endpoint)
+ {
+ int ipLastPart = getIPLastPart(endpoint);
+ if (ipLastPart <= 14)
+ return UUID.randomUUID().toString();
+ else
+ return "RAC" + (ipLastPart % 3);
+ }
+
+ @Override
+ public String getDatacenter(InetAddress endpoint)
+ {
+ if (getIPLastPart(endpoint) % 2 == 0)
+ return "DC2";
+ else
+ return "DC1";
+ }
+
+ private int getIPLastPart(InetAddress endpoint)
+ {
+ String str = endpoint.toString();
+ int index = str.lastIndexOf(".");
+ return Integer.parseInt(str.substring(index + 1).trim());
+ }
+ });
+
+ Class<? extends AbstractReplicationStrategy> strategy = NetworkTopologyStrategy.class;
+ KSMetaData keyspace = KSMetaData.testMetadata(keyspaceName, strategy, configOptions(replicas),
+ CFMetaData.denseCFMetaData(keyspaceName, "CF1", BytesType.instance));
+ MigrationManager.announceNewKeyspace(keyspace);
+ }
+
+ private static Map<String, String> configOptions(Integer[] replicas)
+ {
+ Map<String, String> configOptions = new HashMap<>();
+ int i = 1;
+ for(Integer replica : replicas)
+ {
+ if(replica == null)
+ continue;
+ configOptions.put("DC" + i++, String.valueOf(replica));
+ }
+ return configOptions;
+ }
+
+ @Test
+ public void testMoveWithPendingRangesNetworkStrategyRackAwareThirtyNodes() throws Exception
+ {
+ StorageService ss = StorageService.instance;
+ final int RING_SIZE = 60;
+
+ TokenMetadata tmd = ss.getTokenMetadata();
+ tmd.clearUnsafe();
+ VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner);
+ ArrayList<Token> endpointTokens = new ArrayList<>();
+ ArrayList<Token> keyTokens = new ArrayList<>();
+ List<InetAddress> hosts = new ArrayList<>();
+ List<UUID> hostIds = new ArrayList<>();
+
+ for(int i=0; i < RING_SIZE/2; i++)
+ {
+ endpointTokens.add(new BigIntegerToken(String.valueOf(10 * i)));
+ endpointTokens.add(new BigIntegerToken(String.valueOf((10 * i) + 1)));
+ }
+ Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, RING_SIZE);
+ PendingRangeCalculatorService.instance.blockUntilFinished();
+
+ //Moving Endpoint 127.0.0.37 in RAC1 with current token 180
+ int MOVING_NODE = 36;
+ moveHost(hosts.get(MOVING_NODE), 215, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(150, 151, "127.0.0.43"),
+ generatePendingMapEntry(151, 160, "127.0.0.43"),generatePendingMapEntry(160, 161, "127.0.0.43"),
+ generatePendingMapEntry(161, 170, "127.0.0.43"), generatePendingMapEntry(170, 171, "127.0.0.43"),
+ generatePendingMapEntry(171, 180, "127.0.0.43"), generatePendingMapEntry(210, 211, "127.0.0.37"),
+ generatePendingMapEntry(211, 215, "127.0.0.37")), Network_33_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 215, tmd);
+
+ //Moving it back to original spot
+ moveHost(hosts.get(MOVING_NODE), 180, tmd, valueFactory);
+ finishMove(hosts.get(MOVING_NODE), 180, tmd);
+
+ }
+
+ @Test
+ public void testMoveWithPendingRangesNetworkStrategyTenNode() throws Exception
+ {
+ StorageService ss = StorageService.instance;
+ final int RING_SIZE = 14;
+
+ TokenMetadata tmd = ss.getTokenMetadata();
+ tmd.clearUnsafe();
+ VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner);
+ ArrayList<Token> endpointTokens = new ArrayList<>();
+ ArrayList<Token> keyTokens = new ArrayList<>();
+ List<InetAddress> hosts = new ArrayList<>();
+ List<UUID> hostIds = new ArrayList<>();
+
+ for(int i=0; i < RING_SIZE/2; i++)
+ {
+ endpointTokens.add(new BigIntegerToken(String.valueOf(10 * i)));
+ endpointTokens.add(new BigIntegerToken(String.valueOf((10 * i) + 1)));
+ }
+
+ Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, RING_SIZE);
+ PendingRangeCalculatorService.instance.blockUntilFinished();
+
+ int MOVING_NODE = 0;
+ moveHost(hosts.get(MOVING_NODE), 5, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 1, "127.0.0.1"),
+ generatePendingMapEntry(1, 5, "127.0.0.1")), Network_11_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 1, "127.0.0.1"),
+ generatePendingMapEntry(1, 5, "127.0.0.1")), Network_22_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 1, "127.0.0.1"),
+ generatePendingMapEntry(1, 5, "127.0.0.1")), Network_33_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 5, tmd);
+
+ moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 1, "127.0.0.3"),
+ generatePendingMapEntry(1, 5, "127.0.0.3")), Network_11_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 1, "127.0.0.5"),
+ generatePendingMapEntry(1, 5, "127.0.0.5")), Network_22_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 1, "127.0.0.7"),
+ generatePendingMapEntry(1, 5, "127.0.0.7")), Network_33_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 0, tmd);
+
+ MOVING_NODE = 1;
+ moveHost(hosts.get(MOVING_NODE), 5, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 5, "127.0.0.2")), Network_11_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 5, "127.0.0.2")), Network_22_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 5, "127.0.0.2")), Network_33_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 5, tmd);
+
+ moveHost(hosts.get(MOVING_NODE), 1, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 5, "127.0.0.4")), Network_11_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 5, "127.0.0.6")), Network_22_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 5, "127.0.0.8")), Network_33_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 1, tmd);
+
+ MOVING_NODE = 3;
+ moveHost(hosts.get(MOVING_NODE), 25, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 10, "127.0.0.6"),
+ generatePendingMapEntry(10, 11, "127.0.0.6"), generatePendingMapEntry(21, 25, "127.0.0.4")), Network_11_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(61, 0, "127.0.0.6"),
+ generatePendingMapEntry(0, 1, "127.0.0.6"), generatePendingMapEntry(21, 25, "127.0.0.4"),
+ generatePendingMapEntry(11, 20, "127.0.0.4"),generatePendingMapEntry(20, 21, "127.0.0.4")), Network_22_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(51, 60, "127.0.0.6"),
+ generatePendingMapEntry(60, 61, "127.0.0.6"), generatePendingMapEntry(21, 25, "127.0.0.4"),
+ generatePendingMapEntry(11, 20, "127.0.0.4"), generatePendingMapEntry(20, 21, "127.0.0.4")), Network_33_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 25, tmd);
+
+ moveHost(hosts.get(MOVING_NODE), 11, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 10, "127.0.0.4"),
+ generatePendingMapEntry(10, 11, "127.0.0.4"), generatePendingMapEntry(21, 25, "127.0.0.8")), Network_11_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(61, 0, "127.0.0.4"),
+ generatePendingMapEntry(0, 1, "127.0.0.4"), generatePendingMapEntry(11, 20, "127.0.0.8"),
+ generatePendingMapEntry(20, 21, "127.0.0.8"), generatePendingMapEntry(21, 25, "127.0.0.10")), Network_22_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(51, 60, "127.0.0.4"),
+ generatePendingMapEntry(60, 61, "127.0.0.4"), generatePendingMapEntry(21, 25, "127.0.0.12"),
+ generatePendingMapEntry(11, 20, "127.0.0.10"), generatePendingMapEntry(20, 21, "127.0.0.10")), Network_33_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 11, tmd);
+ }
+
+ @Test
+ public void testMoveWithPendingRangesSimpleStrategyTenNode() throws Exception
+ {
+ StorageService ss = StorageService.instance;
+ final int RING_SIZE = 10;
+
+ TokenMetadata tmd = ss.getTokenMetadata();
+ tmd.clearUnsafe();
+ VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner);
+ ArrayList<Token> endpointTokens = new ArrayList<>();
+ ArrayList<Token> keyTokens = new ArrayList<>();
+ List<InetAddress> hosts = new ArrayList<>();
+ List<UUID> hostIds = new ArrayList<>();
+
+ Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, RING_SIZE);
+ PendingRangeCalculatorService.instance.blockUntilFinished();
+
+ final int MOVING_NODE = 0; // index of the moving node
+ moveHost(hosts.get(MOVING_NODE), 2, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.1")), Simple_RF1_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.1")), Simple_RF2_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.1")), Simple_RF3_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 2, tmd);
+
+
+ moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.2")), Simple_RF1_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.3")), Simple_RF2_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.4")), Simple_RF3_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 0, tmd);
+
+ moveHost(hosts.get(MOVING_NODE), 1000, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1000, 0, "127.0.0.2")), Simple_RF1_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1000, 0, "127.0.0.3")), Simple_RF2_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1000, 0, "127.0.0.4")), Simple_RF3_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 1000, tmd);
+
+ moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1000, 0, "127.0.0.1")), Simple_RF1_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1000, 0, "127.0.0.1")), Simple_RF2_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1000, 0, "127.0.0.1")), Simple_RF3_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 0, tmd);
+
+ moveHost(hosts.get(MOVING_NODE), 35, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 35, "127.0.0.1"), generatePendingMapEntry(90, 0, "127.0.0.2")), Simple_RF1_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 35, "127.0.0.1"), generatePendingMapEntry(20, 30, "127.0.0.1"),
+ generatePendingMapEntry(80, 90, "127.0.0.2"), generatePendingMapEntry(90, 0, "127.0.0.3")), Simple_RF2_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 35, "127.0.0.1"), generatePendingMapEntry(20, 30, "127.0.0.1"),
+ generatePendingMapEntry(80, 90, "127.0.0.3"), generatePendingMapEntry(90, 0, "127.0.0.4"),
+ generatePendingMapEntry(10, 20, "127.0.0.1"), generatePendingMapEntry(70, 80, "127.0.0.2")), Simple_RF3_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 35, tmd);
+
+ }
+
+ @Test
+ public void testMoveWithPendingRangesForSimpleStrategyFourNode() throws Exception
+ {
+ StorageService ss = StorageService.instance;
+ final int RING_SIZE = 4;
+
+ TokenMetadata tmd = ss.getTokenMetadata();
+ tmd.clearUnsafe();
+ VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner);
+ ArrayList<Token> endpointTokens = new ArrayList<>();
+ ArrayList<Token> keyTokens = new ArrayList<>();
+ List<InetAddress> hosts = new ArrayList<>();
+ List<UUID> hostIds = new ArrayList<>();
+
+ Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, RING_SIZE);
+ PendingRangeCalculatorService.instance.blockUntilFinished();
+
+ int MOVING_NODE = 0; // index of the moving node
+ moveHost(hosts.get(MOVING_NODE), 2, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.1")), Simple_RF1_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.1")), Simple_RF2_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.1")), Simple_RF3_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 2, tmd);
+
+
+ moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.2")), Simple_RF1_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.3")), Simple_RF2_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.4")), Simple_RF3_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 0, tmd);
+
+ moveHost(hosts.get(MOVING_NODE), 1500, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1500, 0, "127.0.0.2")), Simple_RF1_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1500, 0, "127.0.0.3")), Simple_RF2_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1500, 0, "127.0.0.4")), Simple_RF3_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 1500, tmd);
+
+ moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1500, 0, "127.0.0.1")), Simple_RF1_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1500, 0, "127.0.0.1")), Simple_RF2_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1500, 0, "127.0.0.1")), Simple_RF3_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 0, tmd);
+
+ moveHost(hosts.get(MOVING_NODE), 15, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(10, 15, "127.0.0.1"), generatePendingMapEntry(30, 0, "127.0.0.2")), Simple_RF1_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(20, 30, "127.0.0.2"), generatePendingMapEntry(10, 15, "127.0.0.1"),
+ generatePendingMapEntry(0, 10, "127.0.0.1")), Simple_RF2_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(15, 20, "127.0.0.2"),
+ generatePendingMapEntry(0, 10, "127.0.0.1")), Simple_RF3_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 15, tmd);
+
+ moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 0, "127.0.0.1"),
+ generatePendingMapEntry(10, 15, "127.0.0.3")), Simple_RF1_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(20, 30, "127.0.0.1"),
+ generatePendingMapEntry(10, 15, "127.0.0.4"), generatePendingMapEntry(0, 10, "127.0.0.3")), Simple_RF2_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(15, 20, "127.0.0.1"),
+ generatePendingMapEntry(0, 10, "127.0.0.4")), Simple_RF3_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 0, tmd);
+
+ moveHost(hosts.get(MOVING_NODE), 26, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(20, 26, "127.0.0.1"),
+ generatePendingMapEntry(30, 0, "127.0.0.2")), Simple_RF1_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(26, 30, "127.0.0.2"),
+ generatePendingMapEntry(30, 0, "127.0.0.3"), generatePendingMapEntry(10, 20, "127.0.0.1")), Simple_RF2_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 10, "127.0.0.1"),
+ generatePendingMapEntry(26, 30, "127.0.0.3")), Simple_RF3_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 26, tmd);
+
+ moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(20, 26, "127.0.0.4"),
+ generatePendingMapEntry(30, 0, "127.0.0.1")), Simple_RF1_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 0, "127.0.0.1"),
+ generatePendingMapEntry(26, 30, "127.0.0.1"), generatePendingMapEntry(10, 20, "127.0.0.4")), Simple_RF2_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(26, 30, "127.0.0.1"),
+ generatePendingMapEntry(0, 10, "127.0.0.4")), Simple_RF3_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 0, tmd);
+
+ MOVING_NODE = 3;
+
+ moveHost(hosts.get(MOVING_NODE), 33, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 33, "127.0.0.4")), Simple_RF1_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 33, "127.0.0.4")), Simple_RF2_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 33, "127.0.0.4")), Simple_RF3_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 33, tmd);
+
+ moveHost(hosts.get(MOVING_NODE), 30, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 33, "127.0.0.1")), Simple_RF1_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 33, "127.0.0.2")), Simple_RF2_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 33, "127.0.0.3")), Simple_RF3_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 30, tmd);
+ }
+
+ private void moveHost(InetAddress host, int token, TokenMetadata tmd, VersionedValue.VersionedValueFactory valueFactory )
+ {
+ StorageService.instance.onChange(host, ApplicationState.STATUS, valueFactory.moving(new BigIntegerToken(String.valueOf(token))));
+ PendingRangeCalculatorService.instance.blockUntilFinished();
+ assertTrue(tmd.isMoving(host));
+ }
+
+ private void finishMove(InetAddress host, int token, TokenMetadata tmd)
+ {
+ tmd.removeFromMoving(host);
+ assertTrue(!tmd.isMoving(host));
+ tmd.updateNormalToken(new BigIntegerToken(String.valueOf(token)), host);
+ }
+
+ private Map.Entry<Range<Token>, Collection<InetAddress>> generatePendingMapEntry(int start, int end, String... endpoints) throws UnknownHostException
+ {
+ Map<Range<Token>, Collection<InetAddress>> pendingRanges = new HashMap<>();
+ pendingRanges.put(generateRange(start, end), makeAddrs(endpoints));
+ return pendingRanges.entrySet().iterator().next();
+ }
+
+ private Map<Range<Token>, Collection<InetAddress>> generatePendingRanges(Map.Entry<Range<Token>, Collection<InetAddress>>... entries)
+ {
+ Map<Range<Token>, Collection<InetAddress>> pendingRanges = new HashMap<>();
+ for(Map.Entry<Range<Token>, Collection<InetAddress>> entry : entries)
+ {
+ pendingRanges.put(entry.getKey(), entry.getValue());
+ }
+ return pendingRanges;
+ }
+
+ private void assertPendingRanges(TokenMetadata tmd, Map<Range<Token>, Collection<InetAddress>> pendingRanges, String keyspaceName) throws ConfigurationException
+ {
+ boolean keyspaceFound = false;
+ for (String nonSystemKeyspaceName : Schema.instance.getNonSystemKeyspaces())
+ {
+ if(!keyspaceName.equals(nonSystemKeyspaceName))
+ continue;
+ assertMaps(pendingRanges, tmd.getPendingRanges(keyspaceName));
+ keyspaceFound = true;
+ }
+
+ assert keyspaceFound;
+ }
+
+ private void assertMaps(Map<Range<Token>, Collection<InetAddress>> expected, PendingRangeMaps actual)
+ {
+ int sizeOfActual = 0;
+ Iterator<Map.Entry<Range<Token>, List<InetAddress>>> iterator = actual.iterator();
+ while(iterator.hasNext())
+ {
+ Map.Entry<Range<Token>, List<InetAddress>> actualEntry = iterator.next();
+ assertNotNull(expected.get(actualEntry.getKey()));
+ assertEquals(new HashSet<>(expected.get(actualEntry.getKey())), new HashSet<>(actualEntry.getValue()));
+ sizeOfActual++;
+ }
+
+ assertEquals(expected.size(), sizeOfActual);
+ }
+
/*
* Test whether write endpoints is correct when the node is moving. Uses
* StorageService.onChange and does not manipulate token metadata directly.
@@ -128,6 +565,8 @@ public class MoveTest
for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
{
strategy = getStrategy(keyspaceName, tmd);
+ if(strategy instanceof NetworkTopologyStrategy)
+ continue;
int numMoved = 0;
for (Token token : keyTokens)
{
@@ -235,7 +674,7 @@ public class MoveTest
* }
*/
- Multimap<InetAddress, Range<Token>> keyspace1ranges = keyspaceStrategyMap.get(KEYSPACE1).getAddressRanges();
+ Multimap<InetAddress, Range<Token>> keyspace1ranges = keyspaceStrategyMap.get(Simple_RF1_KeyspaceName).getAddressRanges();
Collection<Range<Token>> ranges1 = keyspace1ranges.get(InetAddress.getByName("127.0.0.1"));
assertEquals(1, collectionSize(ranges1));
assertEquals(generateRange(97, 0), ranges1.iterator().next());
@@ -332,7 +771,7 @@ public class MoveTest
* /127.0.0.10=[(70,87], (87,97], (67,70]]
* }
*/
- Multimap<InetAddress, Range<Token>> keyspace4ranges = keyspaceStrategyMap.get(KEYSPACE4).getAddressRanges();
+ Multimap<InetAddress, Range<Token>> keyspace4ranges = keyspaceStrategyMap.get(Simple_RF3_KeyspaceName).getAddressRanges();
ranges1 = keyspace4ranges.get(InetAddress.getByName("127.0.0.1"));
assertEquals(collectionSize(ranges1), 3);
assertTrue(ranges1.equals(generateRanges(97, 0, 70, 87, 87, 97)));
@@ -366,17 +805,17 @@ public class MoveTest
// pre-calculate the results.
Map<String, Multimap<Token, InetAddress>> expectedEndpoints = new HashMap<String, Multimap<Token, InetAddress>>();
- expectedEndpoints.put(KEYSPACE1, HashMultimap.<Token, InetAddress>create());
- expectedEndpoints.get(KEYSPACE1).putAll(new BigIntegerToken("5"), makeAddrs("127.0.0.2"));
- expectedEndpoints.get(KEYSPACE1).putAll(new BigIntegerToken("15"), makeAddrs("127.0.0.3"));
- expectedEndpoints.get(KEYSPACE1).putAll(new BigIntegerToken("25"), makeAddrs("127.0.0.4"));
- expectedEndpoints.get(KEYSPACE1).putAll(new BigIntegerToken("35"), makeAddrs("127.0.0.5"));
- expectedEndpoints.get(KEYSPACE1).putAll(new BigIntegerToken("45"), makeAddrs("127.0.0.6"));
- expectedEndpoints.get(KEYSPACE1).putAll(new BigIntegerToken("55"), makeAddrs("127.0.0.7", "127.0.1.1"));
- expectedEndpoints.get(KEYSPACE1).putAll(new BigIntegerToken("65"), makeAddrs("127.0.0.7"));
- expectedEndpoints.get(KEYSPACE1).putAll(new BigIntegerToken("75"), makeAddrs("127.0.0.9", "127.0.1.2"));
- expectedEndpoints.get(KEYSPACE1).putAll(new BigIntegerToken("85"), makeAddrs("127.0.0.9"));
- expectedEndpoints.get(KEYSPACE1).putAll(new BigIntegerToken("95"), makeAddrs("127.0.0.10"));
+ expectedEndpoints.put(Simple_RF1_KeyspaceName, HashMultimap.<Token, InetAddress>create());
+ expectedEndpoints.get(Simple_RF1_KeyspaceName).putAll(new BigIntegerToken("5"), makeAddrs("127.0.0.2"));
+ expectedEndpoints.get(Simple_RF1_KeyspaceName).putAll(new BigIntegerToken("15"), makeAddrs("127.0.0.3"));
+ expectedEndpoints.get(Simple_RF1_KeyspaceName).putAll(new BigIntegerToken("25"), makeAddrs("127.0.0.4"));
+ expectedEndpoints.get(Simple_RF1_KeyspaceName).putAll(new BigIntegerToken("35"), makeAddrs("127.0.0.5"));
+ expectedEndpoints.get(Simple_RF1_KeyspaceName).putAll(new BigIntegerToken("45"), makeAddrs("127.0.0.6"));
+ expectedEndpoints.get(Simple_RF1_KeyspaceName).putAll(new BigIntegerToken("55"), makeAddrs("127.0.0.7", "127.0.1.1"));
+ expectedEndpoints.get(Simple_RF1_KeyspaceName).putAll(new BigIntegerToken("65"), makeAddrs("127.0.0.7"));
+ expectedEndpoints.get(Simple_RF1_KeyspaceName).putAll(new BigIntegerToken("75"), makeAddrs("127.0.0.9", "127.0.1.2"));
+ expectedEndpoints.get(Simple_RF1_KeyspaceName).putAll(new BigIntegerToken("85"), makeAddrs("127.0.0.9"));
+ expectedEndpoints.get(Simple_RF1_KeyspaceName).putAll(new BigIntegerToken("95"), makeAddrs("127.0.0.10"));
expectedEndpoints.put(KEYSPACE2, HashMultimap.<Token, InetAddress>create());
expectedEndpoints.get(KEYSPACE2).putAll(new BigIntegerToken("5"), makeAddrs("127.0.0.2"));
expectedEndpoints.get(KEYSPACE2).putAll(new BigIntegerToken("15"), makeAddrs("127.0.0.3"));
@@ -399,17 +838,17 @@ public class MoveTest
expectedEndpoints.get(KEYSPACE3).putAll(new BigIntegerToken("75"), makeAddrs("127.0.0.9", "127.0.0.10", "127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.1.2"));
expectedEndpoints.get(KEYSPACE3).putAll(new BigIntegerToken("85"), makeAddrs("127.0.0.9", "127.0.0.10", "127.0.0.1", "127.0.0.2", "127.0.0.3"));
expectedEndpoints.get(KEYSPACE3).putAll(new BigIntegerToken("95"), makeAddrs("127.0.0.10", "127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4"));
- expectedEndpoints.put(KEYSPACE4, HashMultimap.<Token, InetAddress>create());
- expectedEndpoints.get(KEYSPACE4).putAll(new BigIntegerToken("5"), makeAddrs("127.0.0.2", "127.0.0.3", "127.0.0.4"));
- expectedEndpoints.get(KEYSPACE4).putAll(new BigIntegerToken("15"), makeAddrs("127.0.0.3", "127.0.0.4", "127.0.0.5"));
- expectedEndpoints.get(KEYSPACE4).putAll(new BigIntegerToken("25"), makeAddrs("127.0.0.4", "127.0.0.5", "127.0.0.6"));
- expectedEndpoints.get(KEYSPACE4).putAll(new BigIntegerToken("35"), makeAddrs("127.0.0.5", "127.0.0.6", "127.0.0.7", "127.0.1.1"));
- expectedEndpoints.get(KEYSPACE4).putAll(new BigIntegerToken("45"), makeAddrs("127.0.0.6", "127.0.0.7", "127.0.0.8", "127.0.1.1"));
- expectedEndpoints.get(KEYSPACE4).putAll(new BigIntegerToken("55"), makeAddrs("127.0.0.7", "127.0.0.8", "127.0.0.9", "127.0.1.1", "127.0.1.2"));
- expectedEndpoints.get(KEYSPACE4).putAll(new BigIntegerToken("65"), makeAddrs("127.0.0.7", "127.0.0.8", "127.0.0.9", "127.0.1.2"));
- expectedEndpoints.get(KEYSPACE4).putAll(new BigIntegerToken("75"), makeAddrs("127.0.0.9", "127.0.0.10", "127.0.0.1", "127.0.1.2"));
- expectedEndpoints.get(KEYSPACE4).putAll(new BigIntegerToken("85"), makeAddrs("127.0.0.9", "127.0.0.10", "127.0.0.1"));
- expectedEndpoints.get(KEYSPACE4).putAll(new BigIntegerToken("95"), makeAddrs("127.0.0.10", "127.0.0.1", "127.0.0.2"));
+ expectedEndpoints.put(Simple_RF3_KeyspaceName, HashMultimap.<Token, InetAddress>create());
+ expectedEndpoints.get(Simple_RF3_KeyspaceName).putAll(new BigIntegerToken("5"), makeAddrs("127.0.0.2", "127.0.0.3", "127.0.0.4"));
+ expectedEndpoints.get(Simple_RF3_KeyspaceName).putAll(new BigIntegerToken("15"), makeAddrs("127.0.0.3", "127.0.0.4", "127.0.0.5"));
+ expectedEndpoints.get(Simple_RF3_KeyspaceName).putAll(new BigIntegerToken("25"), makeAddrs("127.0.0.4", "127.0.0.5", "127.0.0.6"));
+ expectedEndpoints.get(Simple_RF3_KeyspaceName).putAll(new BigIntegerToken("35"), makeAddrs("127.0.0.5", "127.0.0.6", "127.0.0.7", "127.0.1.1"));
+ expectedEndpoints.get(Simple_RF3_KeyspaceName).putAll(new BigIntegerToken("45"), makeAddrs("127.0.0.6", "127.0.0.7", "127.0.0.8", "127.0.1.1"));
+ expectedEndpoints.get(Simple_RF3_KeyspaceName).putAll(new BigIntegerToken("55"), makeAddrs("127.0.0.7", "127.0.0.8", "127.0.0.9", "127.0.1.1", "127.0.1.2"));
+ expectedEndpoints.get(Simple_RF3_KeyspaceName).putAll(new BigIntegerToken("65"), makeAddrs("127.0.0.7", "127.0.0.8", "127.0.0.9", "127.0.1.2"));
+ expectedEndpoints.get(Simple_RF3_KeyspaceName).putAll(new BigIntegerToken("75"), makeAddrs("127.0.0.9", "127.0.0.10", "127.0.0.1", "127.0.1.2"));
+ expectedEndpoints.get(Simple_RF3_KeyspaceName).putAll(new BigIntegerToken("85"), makeAddrs("127.0.0.9", "127.0.0.10", "127.0.0.1"));
+ expectedEndpoints.get(Simple_RF3_KeyspaceName).putAll(new BigIntegerToken("95"), makeAddrs("127.0.0.10", "127.0.0.1", "127.0.0.2"));
for (Map.Entry<String, AbstractReplicationStrategy> keyspaceStrategy : keyspaceStrategyMap.entrySet())
{