You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/01/08 15:29:13 UTC
[03/22] cassandra git commit: Fix pending range calculation during
moves
Fix pending range calculation during moves
patch by kohlisankalp; reviewed by blambov for CASSANDRA-10887
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/812df9e8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/812df9e8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/812df9e8
Branch: refs/heads/cassandra-2.2
Commit: 812df9e8bc3cb98258a70a4b34cd6e289ff95e27
Parents: 6d6d189
Author: sankalp kohli <ko...@gmail.com>
Authored: Tue Jan 5 15:09:06 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Jan 8 15:18:45 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/dht/Range.java | 21 +
.../service/PendingRangeCalculatorService.java | 36 +-
test/unit/org/apache/cassandra/Util.java | 4 +-
.../org/apache/cassandra/dht/RangeTest.java | 83 +++-
.../org/apache/cassandra/service/MoveTest.java | 435 +++++++++++++++++++
6 files changed, 557 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/812df9e8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 14c5ee6..c167098 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.13
+ * Fix pending range calculation during moves (CASSANDRA-10887)
* Sane default (200Mbps) for inter-DC streaming througput (CASSANDRA-9708)
* Match cassandra-loader options in COPY FROM (CASSANDRA-9303)
* Fix binding to any address in CqlBulkRecordWriter (CASSANDRA-9309)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/812df9e8/src/java/org/apache/cassandra/dht/Range.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Range.java b/src/java/org/apache/cassandra/dht/Range.java
index 81c92a2..618a3f4 100644
--- a/src/java/org/apache/cassandra/dht/Range.java
+++ b/src/java/org/apache/cassandra/dht/Range.java
@@ -300,7 +300,28 @@ public class Range<T extends RingPosition<T>> extends AbstractBounds<T> implemen
return rhs.differenceToFetch(this);
}
+ public Set<Range<T>> subtractAll(Collection<Range<T>> ranges)
+ {
+ Set<Range<T>> result = new HashSet<>();
+ result.add(this);
+ for(Range<T> range : ranges)
+ {
+ result = substractAllFromToken(result, range);
+ }
+
+ return result;
+ }
+ private static <T extends RingPosition<T>> Set<Range<T>> substractAllFromToken(Set<Range<T>> ranges, Range<T> subtract)
+ {
+ Set<Range<T>> result = new HashSet<>();
+ for(Range<T> range : ranges)
+ {
+ result.addAll(range.subtract(subtract));
+ }
+
+ return result;
+ }
/**
* Calculate set of the difference ranges of given two ranges
* (as current (A, B] and rhs is (C, D])
http://git-wip-us.apache.org/repos/asf/cassandra/blob/812df9e8/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
index 0ff8a92..1e7b7bd 100644
--- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
+++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
@@ -169,18 +169,44 @@ public class PendingRangeCalculatorService
// At this stage pendingRanges has been updated according to leaving and bootstrapping nodes.
// We can now finish the calculation by checking moving and relocating nodes.
- // For each of the moving nodes, we do the same thing we did for bootstrapping:
- // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be.
for (Pair<Token, InetAddress> moving : tm.getMovingEndpoints())
{
+ //Calculate all the ranges which will could be affected. This will include the ranges before and after the move.
+ Set<Range<Token>> moveAffectedRanges = new HashSet<>();
InetAddress endpoint = moving.right; // address of the moving node
+ //Add ranges before the move
+ for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+ {
+ moveAffectedRanges.add(range);
+ }
- // moving.left is a new token of the endpoint
allLeftMetadata.updateNormalToken(moving.left, endpoint);
-
+ //Add ranges after the move
for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
{
- pendingRanges.put(range, endpoint);
+ moveAffectedRanges.add(range);
+ }
+
+ for(Range<Token> range : moveAffectedRanges)
+ {
+ Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
+ Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
+ Set<InetAddress> difference = Sets.difference(newEndpoints, currentEndpoints);
+ for(final InetAddress address : difference)
+ {
+ Collection<Range<Token>> newRanges = strategy.getAddressRanges(allLeftMetadata).get(address);
+ Collection<Range<Token>> oldRanges = strategy.getAddressRanges(metadata).get(address);
+ //We want to get rid of any ranges which the node is currently getting.
+ newRanges.removeAll(oldRanges);
+
+ for(Range<Token> newRange : newRanges)
+ {
+ for(Range<Token> pendingRange : newRange.subtractAll(oldRanges))
+ {
+ pendingRanges.put(pendingRange, address);
+ }
+ }
+ }
}
allLeftMetadata.removeEndpoint(endpoint);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/812df9e8/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index e05468f..3c2d32c 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -252,9 +252,11 @@ public class Util
for (int i = hostIdPool.size(); i < howMany; i++)
hostIdPool.add(UUID.randomUUID());
+ boolean endpointTokenPrefilled = endpointTokens != null && !endpointTokens.isEmpty();
for (int i=0; i<howMany; i++)
{
- endpointTokens.add(new BigIntegerToken(String.valueOf(10 * i)));
+ if(!endpointTokenPrefilled)
+ endpointTokens.add(new BigIntegerToken(String.valueOf(10 * i)));
keyTokens.add(new BigIntegerToken(String.valueOf(10 * i + 5)));
hostIds.add(hostIdPool.get(i));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/812df9e8/test/unit/org/apache/cassandra/dht/RangeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/RangeTest.java b/test/unit/org/apache/cassandra/dht/RangeTest.java
index 1d8123b..2083f53 100644
--- a/test/unit/org/apache/cassandra/dht/RangeTest.java
+++ b/test/unit/org/apache/cassandra/dht/RangeTest.java
@@ -19,17 +19,13 @@
package org.apache.cassandra.dht;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
+import java.util.*;
import com.google.common.base.Joiner;
import static java.util.Arrays.asList;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.junit.Test;
@@ -191,12 +187,12 @@ public class RangeTest
Set<Range<T>> correct = Range.rangeSet(ranges);
Set<Range> result1 = one.intersectionWith(two);
assert result1.equals(correct) : String.format("%s != %s",
- StringUtils.join(result1, ","),
- StringUtils.join(correct, ","));
+ StringUtils.join(result1, ","),
+ StringUtils.join(correct, ","));
Set<Range> result2 = two.intersectionWith(one);
assert result2.equals(correct) : String.format("%s != %s",
- StringUtils.join(result2, ","),
- StringUtils.join(correct, ","));
+ StringUtils.join(result2, ","),
+ StringUtils.join(correct, ","));
}
private void assertNoIntersection(Range wraps1, Range nowrap3)
@@ -265,15 +261,15 @@ public class RangeTest
Range nowrap2 = new Range(new BigIntegerToken("0"), new BigIntegerToken("100"));
assertIntersection(wraps1,
- nowrap1,
- new Range(new BigIntegerToken("0"), new BigIntegerToken("10")),
- new Range(new BigIntegerToken("100"), new BigIntegerToken("200")));
+ nowrap1,
+ new Range(new BigIntegerToken("0"), new BigIntegerToken("10")),
+ new Range(new BigIntegerToken("100"), new BigIntegerToken("200")));
assertIntersection(wraps2,
- nowrap1,
- new Range(new BigIntegerToken("100"), new BigIntegerToken("200")));
+ nowrap1,
+ new Range(new BigIntegerToken("100"), new BigIntegerToken("200")));
assertIntersection(wraps1,
- nowrap2,
- new Range(new BigIntegerToken("0"), new BigIntegerToken("10")));
+ nowrap2,
+ new Range(new BigIntegerToken("0"), new BigIntegerToken("10")));
}
@Test
@@ -331,6 +327,59 @@ public class RangeTest
return new Range(new BigIntegerToken(token1), new BigIntegerToken(token2));
}
+ private Range<Token> makeRange(long token1, long token2)
+ {
+ return new Range<Token>(new LongToken(token1), new LongToken(token2));
+ }
+
+ private void assertRanges(Set<Range<Token>> result, Long ... tokens)
+ {
+ assert tokens.length % 2 ==0;
+
+ final Set<Range<Token>> expected = new HashSet<>();
+ for(int i=0; i < tokens.length; i+=2)
+ {
+ expected.add(makeRange(tokens[i], tokens[i+1]));
+ }
+
+ assert CollectionUtils.isEqualCollection(result, expected);
+
+ }
+
+ @Test
+ public void testSubtractAll()
+ {
+ Range<Token> range = new Range<Token>(new LongToken(1L), new LongToken(100L));
+
+ Collection<Range<Token>> collection = new HashSet<>();
+ collection.add(makeRange(1L, 10L));
+ assertRanges(range.subtractAll(collection), 10L, 100L);
+ collection.add(makeRange(90L, 100L));
+ assertRanges(range.subtractAll(collection), 10L, 90L);
+ collection.add(makeRange(54L, 60L));
+ assertRanges(range.subtractAll(collection), 10L, 54L, 60L, 90L);
+ collection.add(makeRange(80L, 95L));
+ assertRanges(range.subtractAll(collection), 10L, 54L, 60L, 80L);
+ }
+
+ @Test
+ public void testSubtractAllWithWrapAround()
+ {
+ Range<Token> range = new Range<Token>(new LongToken(100L), new LongToken(10L));
+
+ Collection<Range<Token>> collection = new HashSet<>();
+ collection.add(makeRange(20L, 30L));
+ assertRanges(range.subtractAll(collection), 100L, 10L);
+ collection.add(makeRange(200L, 500L));
+ assertRanges(range.subtractAll(collection), 100L, 200L, 500L, 10L);
+ collection.add(makeRange(1L, 10L));
+ assertRanges(range.subtractAll(collection), 100L, 200L, 500L, 1L);
+ collection.add(makeRange(0L, 1L));
+ assertRanges(range.subtractAll(collection), 100L, 200L, 500L, 0L);
+ collection.add(makeRange(1000L, 0));
+ assertRanges(range.subtractAll(collection), 100L, 200L, 500L, 1000L);
+ }
+
private Set<Range> makeRanges(String[][] tokenPairs)
{
Set<Range> ranges = new HashSet<Range>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/812df9e8/test/unit/org/apache/cassandra/service/MoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/MoveTest.java b/test/unit/org/apache/cassandra/service/MoveTest.java
index 821fff0..49e3391 100644
--- a/test/unit/org/apache/cassandra/service/MoveTest.java
+++ b/test/unit/org/apache/cassandra/service/MoveTest.java
@@ -27,7 +27,13 @@ import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import static org.junit.Assert.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyType;
+import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.locator.AbstractNetworkTopologySnitch;
+import org.apache.cassandra.locator.NetworkTopologyStrategy;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -60,6 +66,9 @@ public class MoveTest
{
oldPartitioner = StorageService.instance.setPartitionerUnsafe(partitioner);
SchemaLoader.loadSchema();
+ addNetworkTopologyKeyspace(Network_11_KeyspaceName, 1, 1);
+ addNetworkTopologyKeyspace(Network_22_KeyspaceName, 2, 2);
+ addNetworkTopologyKeyspace(Network_33_KeyspaceName, 3, 3);
}
@AfterClass
@@ -69,6 +78,430 @@ public class MoveTest
SchemaLoader.stopGossiper();
}
+ //Simple Strategy Keyspaces with RF1, 2 and 3
+ private static final String Simple_RF1_KeyspaceName = "Keyspace6";
+ private static final String Simple_RF2_KeyspaceName = "Keyspace5";
+ private static final String Simple_RF3_KeyspaceName = "Keyspace4";
+ //Network Strategy Keyspace with RF DC1=1 and DC2=1 and so on.
+ private static final String Network_11_KeyspaceName = "Network11";
+ private static final String Network_22_KeyspaceName = "Network22";
+ private static final String Network_33_KeyspaceName = "Network33";
+
+ private static void addNetworkTopologyKeyspace(String keyspaceName, Integer... replicas) throws ConfigurationException
+ {
+
+ DatabaseDescriptor.setEndpointSnitch(new AbstractNetworkTopologySnitch()
+ {
+ //Odd IPs are in DC1 and Even are in DC2. Endpoints upto .14 will have unique racks and
+ // then will be same for a set of three.
+ @Override
+ public String getRack(InetAddress endpoint)
+ {
+ int ipLastPart = getIPLastPart(endpoint);
+ if (ipLastPart <= 14)
+ return UUID.randomUUID().toString();
+ else
+ return "RAC" + (ipLastPart % 3);
+ }
+
+ @Override
+ public String getDatacenter(InetAddress endpoint)
+ {
+ if (getIPLastPart(endpoint) % 2 == 0)
+ return "DC2";
+ else
+ return "DC1";
+ }
+
+ private int getIPLastPart(InetAddress endpoint)
+ {
+ String str = endpoint.toString();
+ int index = str.lastIndexOf(".");
+ return Integer.parseInt(str.substring(index + 1).trim());
+ }
+ });
+
+ Class<? extends AbstractReplicationStrategy> strategy = NetworkTopologyStrategy.class;
+ KSMetaData keyspace = KSMetaData.testMetadata(keyspaceName, strategy, configOptions(replicas),
+ CFMetaData.sparseCFMetaData(keyspaceName, "CF1", BytesType.instance));
+ MigrationManager.announceNewKeyspace(keyspace);
+ }
+
+ private static Map<String, String> configOptions(Integer[] replicas)
+ {
+ Map<String, String> configOptions = new HashMap<>();
+ int i = 1;
+ for(Integer replica : replicas)
+ {
+ if(replica == null)
+ continue;
+ configOptions.put("DC" + i++, String.valueOf(replica));
+ }
+ return configOptions;
+ }
+
+ @Test
+ public void testMoveWithPendingRangesNetworkStrategyRackAwareThirtyNodes() throws Exception
+ {
+ StorageService ss = StorageService.instance;
+ final int RING_SIZE = 60;
+
+ TokenMetadata tmd = ss.getTokenMetadata();
+ tmd.clearUnsafe();
+ VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner);
+ ArrayList<Token> endpointTokens = new ArrayList<>();
+ ArrayList<Token> keyTokens = new ArrayList<>();
+ List<InetAddress> hosts = new ArrayList<>();
+ List<UUID> hostIds = new ArrayList<>();
+
+ for(int i=0; i < RING_SIZE/2; i++)
+ {
+ endpointTokens.add(new BigIntegerToken(String.valueOf(10 * i)));
+ endpointTokens.add(new BigIntegerToken(String.valueOf((10 * i) + 1)));
+ }
+ Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, RING_SIZE);
+ PendingRangeCalculatorService.instance.blockUntilFinished();
+
+ //Moving Endpoint 127.0.0.37 in RAC1 with current token 180
+ int MOVING_NODE = 36;
+ moveHost(hosts.get(MOVING_NODE), 215, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(150, 151, "127.0.0.43"),
+ generatePendingMapEntry(151, 160, "127.0.0.43"),generatePendingMapEntry(160, 161, "127.0.0.43"),
+ generatePendingMapEntry(161, 170, "127.0.0.43"), generatePendingMapEntry(170, 171, "127.0.0.43"),
+ generatePendingMapEntry(171, 180, "127.0.0.43"), generatePendingMapEntry(210, 211, "127.0.0.37"),
+ generatePendingMapEntry(211, 215, "127.0.0.37")), Network_33_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 215, tmd);
+
+ //Moving it back to original spot
+ moveHost(hosts.get(MOVING_NODE), 180, tmd, valueFactory);
+ finishMove(hosts.get(MOVING_NODE), 180, tmd);
+
+ }
+
+ @Test
+ public void testMoveWithPendingRangesNetworkStrategyTenNode() throws Exception
+ {
+ StorageService ss = StorageService.instance;
+ final int RING_SIZE = 14;
+
+ TokenMetadata tmd = ss.getTokenMetadata();
+ tmd.clearUnsafe();
+ VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner);
+ ArrayList<Token> endpointTokens = new ArrayList<>();
+ ArrayList<Token> keyTokens = new ArrayList<>();
+ List<InetAddress> hosts = new ArrayList<>();
+ List<UUID> hostIds = new ArrayList<>();
+
+ for(int i=0; i < RING_SIZE/2; i++)
+ {
+ endpointTokens.add(new BigIntegerToken(String.valueOf(10 * i)));
+ endpointTokens.add(new BigIntegerToken(String.valueOf((10 * i) + 1)));
+ }
+ Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, RING_SIZE);
+ PendingRangeCalculatorService.instance.blockUntilFinished();
+
+ int MOVING_NODE = 0;
+ moveHost(hosts.get(MOVING_NODE), 5, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 1, "127.0.0.1"),
+ generatePendingMapEntry(1, 5, "127.0.0.1")), Network_11_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 1, "127.0.0.1"),
+ generatePendingMapEntry(1, 5, "127.0.0.1")), Network_22_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 1, "127.0.0.1"),
+ generatePendingMapEntry(1, 5, "127.0.0.1")), Network_33_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 5, tmd);
+
+ moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 1, "127.0.0.3"),
+ generatePendingMapEntry(1, 5, "127.0.0.3")), Network_11_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 1, "127.0.0.5"),
+ generatePendingMapEntry(1, 5, "127.0.0.5")), Network_22_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 1, "127.0.0.7"),
+ generatePendingMapEntry(1, 5, "127.0.0.7")), Network_33_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 0, tmd);
+
+ MOVING_NODE = 1;
+ moveHost(hosts.get(MOVING_NODE), 5, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 5, "127.0.0.2")), Network_11_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 5, "127.0.0.2")), Network_22_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 5, "127.0.0.2")), Network_33_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 5, tmd);
+
+ moveHost(hosts.get(MOVING_NODE), 1, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 5, "127.0.0.4")), Network_11_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 5, "127.0.0.6")), Network_22_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 5, "127.0.0.8")), Network_33_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 1, tmd);
+
+ MOVING_NODE = 3;
+ moveHost(hosts.get(MOVING_NODE), 25, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 10, "127.0.0.6"),
+ generatePendingMapEntry(10, 11, "127.0.0.6"), generatePendingMapEntry(21, 25, "127.0.0.4")), Network_11_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(61, 0, "127.0.0.6"),
+ generatePendingMapEntry(0, 1, "127.0.0.6"), generatePendingMapEntry(21, 25, "127.0.0.4"),
+ generatePendingMapEntry(11, 20, "127.0.0.4"),generatePendingMapEntry(20, 21, "127.0.0.4")), Network_22_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(51, 60, "127.0.0.6"),
+ generatePendingMapEntry(60, 61, "127.0.0.6"), generatePendingMapEntry(21, 25, "127.0.0.4"),
+ generatePendingMapEntry(11, 20, "127.0.0.4"), generatePendingMapEntry(20, 21, "127.0.0.4")), Network_33_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 25, tmd);
+
+ moveHost(hosts.get(MOVING_NODE), 11, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1, 10, "127.0.0.4"),
+ generatePendingMapEntry(10, 11, "127.0.0.4"), generatePendingMapEntry(21, 25, "127.0.0.8")), Network_11_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(61, 0, "127.0.0.4"),
+ generatePendingMapEntry(0, 1, "127.0.0.4"), generatePendingMapEntry(11, 20, "127.0.0.8"),
+ generatePendingMapEntry(20, 21, "127.0.0.8"), generatePendingMapEntry(21, 25, "127.0.0.10")), Network_22_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(51, 60, "127.0.0.4"),
+ generatePendingMapEntry(60, 61, "127.0.0.4"), generatePendingMapEntry(21, 25, "127.0.0.12"),
+ generatePendingMapEntry(11, 20, "127.0.0.10"), generatePendingMapEntry(20, 21, "127.0.0.10")), Network_33_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 11, tmd);
+ }
+
+ @Test
+ public void testMoveWithPendingRangesSimpleStrategyTenNode() throws Exception
+ {
+ StorageService ss = StorageService.instance;
+ final int RING_SIZE = 10;
+
+ TokenMetadata tmd = ss.getTokenMetadata();
+ tmd.clearUnsafe();
+ VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner);
+ ArrayList<Token> endpointTokens = new ArrayList<>();
+ ArrayList<Token> keyTokens = new ArrayList<>();
+ List<InetAddress> hosts = new ArrayList<>();
+ List<UUID> hostIds = new ArrayList<>();
+
+ Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, RING_SIZE);
+ PendingRangeCalculatorService.instance.blockUntilFinished();
+
+ final int MOVING_NODE = 0; // index of the moving node
+ moveHost(hosts.get(MOVING_NODE), 2, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.1")), Simple_RF1_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.1")), Simple_RF2_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.1")), Simple_RF3_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 2, tmd);
+
+
+ moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.2")), Simple_RF1_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.3")), Simple_RF2_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.4")), Simple_RF3_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 0, tmd);
+
+ moveHost(hosts.get(MOVING_NODE), 1000, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1000, 0, "127.0.0.2")), Simple_RF1_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1000, 0, "127.0.0.3")), Simple_RF2_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1000, 0, "127.0.0.4")), Simple_RF3_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 1000, tmd);
+
+ moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1000, 0, "127.0.0.1")), Simple_RF1_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1000, 0, "127.0.0.1")), Simple_RF2_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1000, 0, "127.0.0.1")), Simple_RF3_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 0, tmd);
+
+ moveHost(hosts.get(MOVING_NODE), 35, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 35, "127.0.0.1"), generatePendingMapEntry(90, 0, "127.0.0.2")), Simple_RF1_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 35, "127.0.0.1"), generatePendingMapEntry(20, 30, "127.0.0.1"),
+ generatePendingMapEntry(80, 90, "127.0.0.2"), generatePendingMapEntry(90, 0, "127.0.0.3")), Simple_RF2_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 35, "127.0.0.1"), generatePendingMapEntry(20, 30, "127.0.0.1"),
+ generatePendingMapEntry(80, 90, "127.0.0.3"), generatePendingMapEntry(90, 0, "127.0.0.4"),
+ generatePendingMapEntry(10, 20, "127.0.0.1"), generatePendingMapEntry(70, 80, "127.0.0.2")), Simple_RF3_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 35, tmd);
+
+ }
+
+ @Test
+ public void testMoveWithPendingRangesForSimpleStrategyFourNode() throws Exception
+ {
+ StorageService ss = StorageService.instance;
+ final int RING_SIZE = 4;
+
+ TokenMetadata tmd = ss.getTokenMetadata();
+ tmd.clearUnsafe();
+ VersionedValue.VersionedValueFactory valueFactory = new VersionedValue.VersionedValueFactory(partitioner);
+ ArrayList<Token> endpointTokens = new ArrayList<>();
+ ArrayList<Token> keyTokens = new ArrayList<>();
+ List<InetAddress> hosts = new ArrayList<>();
+ List<UUID> hostIds = new ArrayList<>();
+
+ Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, RING_SIZE);
+ PendingRangeCalculatorService.instance.blockUntilFinished();
+
+ int MOVING_NODE = 0; // index of the moving node
+ moveHost(hosts.get(MOVING_NODE), 2, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.1")), Simple_RF1_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.1")), Simple_RF2_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.1")), Simple_RF3_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 2, tmd);
+
+
+ moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.2")), Simple_RF1_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.3")), Simple_RF2_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 2, "127.0.0.4")), Simple_RF3_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 0, tmd);
+
+ moveHost(hosts.get(MOVING_NODE), 1500, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1500, 0, "127.0.0.2")), Simple_RF1_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1500, 0, "127.0.0.3")), Simple_RF2_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1500, 0, "127.0.0.4")), Simple_RF3_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 1500, tmd);
+
+ moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1500, 0, "127.0.0.1")), Simple_RF1_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1500, 0, "127.0.0.1")), Simple_RF2_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(1500, 0, "127.0.0.1")), Simple_RF3_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 0, tmd);
+
+ moveHost(hosts.get(MOVING_NODE), 15, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(10, 15, "127.0.0.1"), generatePendingMapEntry(30, 0, "127.0.0.2")), Simple_RF1_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(20, 30, "127.0.0.2"), generatePendingMapEntry(10, 15, "127.0.0.1"),
+ generatePendingMapEntry(0, 10, "127.0.0.1")), Simple_RF2_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(15, 20, "127.0.0.2"),
+ generatePendingMapEntry(0, 10, "127.0.0.1")), Simple_RF3_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 15, tmd);
+
+ moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 0, "127.0.0.1"),
+ generatePendingMapEntry(10, 15, "127.0.0.3")), Simple_RF1_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(20, 30, "127.0.0.1"),
+ generatePendingMapEntry(10, 15, "127.0.0.4"), generatePendingMapEntry(0, 10, "127.0.0.3")), Simple_RF2_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(15, 20, "127.0.0.1"),
+ generatePendingMapEntry(0, 10, "127.0.0.4")), Simple_RF3_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 0, tmd);
+
+ moveHost(hosts.get(MOVING_NODE), 26, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(20, 26, "127.0.0.1"),
+ generatePendingMapEntry(30, 0, "127.0.0.2")), Simple_RF1_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(26, 30, "127.0.0.2"),
+ generatePendingMapEntry(30, 0, "127.0.0.3"), generatePendingMapEntry(10, 20, "127.0.0.1")), Simple_RF2_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(0, 10, "127.0.0.1"),
+ generatePendingMapEntry(26, 30, "127.0.0.3")), Simple_RF3_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 26, tmd);
+
+ moveHost(hosts.get(MOVING_NODE), 0, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(20, 26, "127.0.0.4"),
+ generatePendingMapEntry(30, 0, "127.0.0.1")), Simple_RF1_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 0, "127.0.0.1"),
+ generatePendingMapEntry(26, 30, "127.0.0.1"), generatePendingMapEntry(10, 20, "127.0.0.4")), Simple_RF2_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(26, 30, "127.0.0.1"),
+ generatePendingMapEntry(0, 10, "127.0.0.4")), Simple_RF3_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 0, tmd);
+
+ MOVING_NODE = 3;
+
+ moveHost(hosts.get(MOVING_NODE), 33, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 33, "127.0.0.4")), Simple_RF1_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 33, "127.0.0.4")), Simple_RF2_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 33, "127.0.0.4")), Simple_RF3_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 33, tmd);
+
+ moveHost(hosts.get(MOVING_NODE), 30, tmd, valueFactory);
+
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 33, "127.0.0.1")), Simple_RF1_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 33, "127.0.0.2")), Simple_RF2_KeyspaceName);
+ assertPendingRanges(tmd, generatePendingRanges(generatePendingMapEntry(30, 33, "127.0.0.3")), Simple_RF3_KeyspaceName);
+
+ finishMove(hosts.get(MOVING_NODE), 30, tmd);
+ }
+
+ private void moveHost(InetAddress host, int token, TokenMetadata tmd, VersionedValue.VersionedValueFactory valueFactory )
+ {
+ StorageService.instance.onChange(host, ApplicationState.STATUS, valueFactory.moving(new BigIntegerToken(String.valueOf(token))));
+ PendingRangeCalculatorService.instance.blockUntilFinished();
+ assertTrue(tmd.isMoving(host));
+ }
+
+ private void finishMove(InetAddress host, int token, TokenMetadata tmd)
+ {
+ tmd.removeFromMoving(host);
+ assertTrue(!tmd.isMoving(host));
+ tmd.updateNormalToken(new BigIntegerToken(String.valueOf(token)), host);
+ }
+
+ private Map.Entry<Range<Token>, Collection<InetAddress>> generatePendingMapEntry(int start, int end, String... endpoints) throws UnknownHostException
+ {
+ Map<Range<Token>, Collection<InetAddress>> pendingRanges = new HashMap<>();
+ pendingRanges.put(generateRange(start, end), makeAddrs(endpoints));
+ return pendingRanges.entrySet().iterator().next();
+ }
+
+ private Map<Range<Token>, Collection<InetAddress>> generatePendingRanges(Map.Entry<Range<Token>, Collection<InetAddress>>... entries)
+ {
+ Map<Range<Token>, Collection<InetAddress>> pendingRanges = new HashMap<>();
+ for(Map.Entry<Range<Token>, Collection<InetAddress>> entry : entries)
+ {
+ pendingRanges.put(entry.getKey(), entry.getValue());
+ }
+ return pendingRanges;
+ }
+
+ private void assertPendingRanges(TokenMetadata tmd, Map<Range<Token>, Collection<InetAddress>> pendingRanges, String keyspaceName) throws ConfigurationException
+ {
+ boolean keyspaceFound = false;
+ for (String nonSystemKeyspaceName : Schema.instance.getNonSystemKeyspaces())
+ {
+ if(!keyspaceName.equals(nonSystemKeyspaceName))
+ continue;
+ assertMaps(pendingRanges, tmd.getPendingRanges(keyspaceName));
+ keyspaceFound = true;
+ }
+
+ assert keyspaceFound;
+ }
+
+ private void assertMaps(Map<Range<Token>, Collection<InetAddress>> expected, Map<Range<Token>, Collection<InetAddress>> actual)
+ {
+ assertEquals(expected.size(), actual.size());
+ for(Map.Entry<Range<Token>, Collection<InetAddress>> expectedEntry : expected.entrySet())
+ {
+ assertNotNull(actual.get(expectedEntry.getKey()));
+ assertEquals(new ArrayList<>(expectedEntry.getValue()), new ArrayList<>(actual.get(expectedEntry.getKey())));
+ }
+ }
+
/*
* Test whether write endpoints is correct when the node is moving. Uses
* StorageService.onChange and does not manipulate token metadata directly.
@@ -116,6 +549,8 @@ public class MoveTest
for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
{
strategy = getStrategy(keyspaceName, tmd);
+ if(strategy instanceof NetworkTopologyStrategy)
+ continue;
int numMoved = 0;
for (Token token : keyTokens)
{